本篇介绍
本篇接着<<Android 音频低延时mmap介绍(1)>>继续介绍aaudio 的mmap机制,本篇旨在揭示mmap机制中的数据同步。
aaudio mmap介绍
故事还是获取mmap buffer开始:
aaudio_result_t AAudioServiceEndpointMMAP::createMmapBuffer(
android::base::unique_fd* fileDescriptor)
{
memset(&mMmapBufferinfo, 0, sizeof(struct audio_mmap_buffer_info));
int32_t minSizeFrames = getBufferCapacity();
if (minSizeFrames <= 0) { // zero will get rejected
minSizeFrames = AAUDIO_BUFFER_CAPACITY_MIN;
}
status_t status = mMmapStream->createMmapBuffer(minSizeFrames, &mMmapBufferinfo);
bool isBufferShareable = mMmapBufferinfo.flags & AUDIO_MMAP_APPLICATION_SHAREABLE;
if (status != OK) {
ALOGE("%s() - createMmapBuffer() failed with status %d %s",
__func__, status, strerror(-status));
return AAUDIO_ERROR_UNAVAILABLE;
} else {
ALOGD("%s() createMmapBuffer() buffer_size = %d fr, burst_size %d fr"
", Sharable FD: %s",
__func__,
mMmapBufferinfo.buffer_size_frames,
mMmapBufferinfo.burst_size_frames,
isBufferShareable ? "Yes" : "No");
}
setBufferCapacity(mMmapBufferinfo.buffer_size_frames);
if (!isBufferShareable) {
// Exclusive mode can only be used by the service because the FD cannot be shared.
int32_t audioServiceUid =
VALUE_OR_FATAL(legacy2aidl_uid_t_int32_t(getuid()));
if ((mMmapClient.attributionSource.uid != audioServiceUid) &&
getSharingMode() == AAUDIO_SHARING_MODE_EXCLUSIVE) {
ALOGW("%s() - exclusive FD cannot be used by client", __func__);
return AAUDIO_ERROR_UNAVAILABLE;
}
}
// AAudio creates a copy of this FD and retains ownership of the copy.
// Assume that AudioFlinger will close the original shared_memory_fd.
fileDescriptor->reset(dup(mMmapBufferinfo.shared_memory_fd));
if (fileDescriptor->get() == -1) {
ALOGE("%s() - could not dup shared_memory_fd", __func__);
return AAUDIO_ERROR_INTERNAL;
}
// Call to HAL to make sure the transport FD was able to be closed by binder.
// This is a tricky workaround for a problem in Binder.
// TODO:[b/192048842] When that problem is fixed we may be able to remove or change this code.
struct audio_mmap_position position;
mMmapStream->getMmapPosition(&position);
mFramesPerBurst = mMmapBufferinfo.burst_size_frames;
return AAUDIO_OK;
}
从上一篇我们看到createMmapBuffer之后,就拿到了和驱动共享的内存地址,大小等信息,具体信息在mMmapBufferinfo中:
/** @TODO export from .hal */
typedef enum {
NONE = 0x0,
/**
* Only set this flag if applications can access the audio buffer memory
* shared with the backend (usually DSP) _without_ security issue.
*
* Setting this flag also implies that Binder will allow passing the shared memory FD
* to applications.
*
* That usually implies that the kernel will prevent any access to the
* memory surrounding the audio buffer as it could lead to a security breach.
*
* For example, a "/dev/snd/" file descriptor generally is not shareable,
* but an "anon_inode:dmabuffer" file descriptor is shareable.
* See also Linux kernel's dma_buf.
*
* This flag is required to support AAudio exclusive mode:
* See: https://source.android.com/devices/audio/aaudio
*/
AUDIO_MMAP_APPLICATION_SHAREABLE = 0x1,
} audio_mmap_buffer_flag;
/**
* Mmap buffer descriptor returned by audio_stream->create_mmap_buffer().
* note\ Used by streams opened in mmap mode.
*/
struct audio_mmap_buffer_info {
void* shared_memory_address; /**< base address of mmap memory buffer.
For use by local process only */
int32_t shared_memory_fd; /**< FD for mmap memory buffer */
int32_t buffer_size_frames; /**< total buffer size in frames */
int32_t burst_size_frames; /**< transfer size granularity in frames */
audio_mmap_buffer_flag flags; /**< Attributes describing the buffer. */
};
这儿看到了AUDIO_MMAP_APPLICATION_SHAREABLE标记,这个是独占模式使用的。后面我们也能清晰看到独占和共享模式的差异。
从上面也能看到,如果内存没有AUDIO_MMAP_APPLICATION_SHAREABLE标记,那么独占模式就失败了。
接下来需要保存fd,用来通过IPC共享fd实现应用与驱动共享内存。
到了这儿共享内存,fd信息还在audioserver中,那如何让应用共享呢?又如何驱动应用读取采集数据呢?接下来我们用start中寻找答案。
故事的开始是AAudioStream_requestStart:
AAUDIO_API aaudio_result_t AAudioStream_requestStart(AAudioStream* stream)
{
AudioStream *audioStream = convertAAudioStreamToAudioStream(stream);
aaudio_stream_id_t id = audioStream->getId();
ALOGD("%s(s#%u) called --------------", __func__, id);
aaudio_result_t result = audioStream->systemStart();
ALOGD("%s(s#%u) returned %d ---------", __func__, id, result);
return result;
}
这时候是业务请求启动aaudio 流, 接着往下看:
aaudio_result_t AudioStream::systemStart() {
if (collidesWithCallback()) {
ALOGE("%s cannot be called from a callback!", __func__);
return AAUDIO_ERROR_INVALID_STATE;
}
std::lock_guard<std::mutex> lock(mStreamLock);
switch (getState()) {
// Is this a good time to start?
case AAUDIO_STREAM_STATE_OPEN:
case AAUDIO_STREAM_STATE_PAUSING:
case AAUDIO_STREAM_STATE_PAUSED:
case AAUDIO_STREAM_STATE_STOPPING:
case AAUDIO_STREAM_STATE_STOPPED:
case AAUDIO_STREAM_STATE_FLUSHING:
case AAUDIO_STREAM_STATE_FLUSHED:
break; // Proceed with starting.
// Already started?
case AAUDIO_STREAM_STATE_STARTING:
case AAUDIO_STREAM_STATE_STARTED:
ALOGW("%s() stream was already started, state = %s", __func__,
AudioGlobal_convertStreamStateToText(getState()));
return AAUDIO_ERROR_INVALID_STATE;
// Don't start when the stream is dead!
case AAUDIO_STREAM_STATE_DISCONNECTED:
case AAUDIO_STREAM_STATE_CLOSING:
case AAUDIO_STREAM_STATE_CLOSED:
default:
ALOGW("%s() stream is dead, state = %s", __func__,
AudioGlobal_convertStreamStateToText(getState()));
return AAUDIO_ERROR_INVALID_STATE;
}
aaudio_result_t result = requestStart_l();
if (result == AAUDIO_OK) {
// We only call this for logging in "dumpsys audio". So ignore return code.
(void) mPlayerBase->startWithStatus(getDeviceId());
}
return result;
}
从上面可以看到如下信息:
- 在数据回调和异常回调里最好不要操作aaudio的起停接口,会有不可预期的问题。
- 在启动aaudio的时候也会通知下mPlayerBase。
这时候可能会有疑问,为什么需要通知mPlayerBase?
要回答这个问题,原因也比较直接,dumpsys audio会记录系统中所有采播的运行记录,这儿的mPlayerBase就是为了将该信息记录到audioserver中,这样dumpsys audio就可以看到aaudio的运行记录了。
接下来看下requestStart_l的内容:
aaudio_result_t AudioStreamInternal::requestStart_l()
{
int64_t startTime;
if (getServiceHandle() == AAUDIO_HANDLE_INVALID) {
ALOGD("requestStart() mServiceStreamHandle invalid");
return AAUDIO_ERROR_INVALID_STATE;
}
if (isActive()) {
ALOGD("requestStart() already active");
return AAUDIO_ERROR_INVALID_STATE;
}
if (isDisconnected()) {
ALOGD("requestStart() but DISCONNECTED");
return AAUDIO_ERROR_DISCONNECTED;
}
const aaudio_stream_state_t originalState = getState();
setState(AAUDIO_STREAM_STATE_STARTING);
// Clear any stale timestamps from the previous run.
drainTimestampsFromService();
prepareBuffersForStart(); // tell subclasses to get ready
aaudio_result_t result = mServiceInterface.startStream(mServiceStreamHandleInfo);
if (result == AAUDIO_ERROR_STANDBY) {
// The stream is at standby mode. Need to exit standby before starting the stream.
result = exitStandby_l();
if (result == AAUDIO_OK) {
result = mServiceInterface.startStream(mServiceStreamHandleInfo);
}
}
if (result != AAUDIO_OK) {
ALOGD("%s() error = %d, stream was probably stolen", __func__, result);
// Stealing was added in R. Coerce result to improve backward compatibility.
result = AAUDIO_ERROR_DISCONNECTED;
setDisconnected();
}
startTime = AudioClock::getNanoseconds();
mClockModel.start(startTime);
mNeedCatchUp.request(); // Ask data processing code to catch up when first timestamp received.
// Start data callback thread.
if (result == AAUDIO_OK && isDataCallbackSet()) {
// Launch the callback loop thread.
int64_t periodNanos = mCallbackFrames
* AAUDIO_NANOS_PER_SECOND
/ getSampleRate();
mCallbackEnabled.store(true);
result = createThread_l(periodNanos, aaudio_callback_thread_proc, this);
}
if (result != AAUDIO_OK) {
setState(originalState);
}
return result;
}
这儿主要做了几件事:
- 为启动流做准备工作,时间戳,buffer。
- 通知server端启动流
- 开始时间戳同步和启动回调线程
接下来就挨个看下,首先是setState:
void AudioStream::setState(aaudio_stream_state_t state) {
aaudio_stream_state_t oldState = mState.load();
ALOGD("%s(s#%d) from %d to %d", __func__, getId(), oldState, state);
if (state == oldState) {
return; // no change
}
LOG_ALWAYS_FATAL_IF(state == AAUDIO_STREAM_STATE_DISCONNECTED,
"Disconnected state must be separated from mState");
// CLOSED is a final state
if (oldState == AAUDIO_STREAM_STATE_CLOSED) {
ALOGW("%s(%d) tried to set to %d but already CLOSED", __func__, getId(), state);
// Once CLOSING, we can only move to CLOSED state.
} else if (oldState == AAUDIO_STREAM_STATE_CLOSING
&& state != AAUDIO_STREAM_STATE_CLOSED) {
ALOGW("%s(%d) tried to set to %d but already CLOSING", __func__, getId(), state);
} else {
mState.store(state);
// Wake up a wakeForStateChange thread if it exists.
syscall(SYS_futex, &mState, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0);
}
}
这儿不涉及IPC,只是更新当前进程内保存的状态,并且通过futex唤醒等待的线程。
futex 是fast userspace mutexes的缩写,通过在用户态内存方式维护一个锁状态变量,通过判断该变量来判断是否进入内核,可以提升同步操作的性能,比如不需要每次获取锁,释放锁操作都需要走系统调用。
接下来再看下drainTimestampsFromService:
aaudio_result_t AudioStreamInternal::drainTimestampsFromService() {
aaudio_result_t result = AAUDIO_OK;
while (result == AAUDIO_OK) {
AAudioServiceMessage message;
if (!mAudioEndpoint) {
break;
}
if (mAudioEndpoint->readUpCommand(&message) != 1) {
break; // no command this time, no problem
}
switch (message.what) {
// ignore most messages
case AAudioServiceMessage::code::TIMESTAMP_SERVICE:
case AAudioServiceMessage::code::TIMESTAMP_HARDWARE:
break;
case AAudioServiceMessage::code::EVENT:
result = onEventFromServer(&message);
break;
default:
ALOGE("%s - unrecognized message.what = %d", __func__, (int) message.what);
result = AAUDIO_ERROR_INTERNAL;
break;
}
}
return result;
}
这儿就是从mAudioEndpoint中读取命令然后进行解析,这个命令是从server端来的。这儿的关键调用有2个,一个是readUpCommand,这个是为了排空指令队列中的指令,另外一个是onEventFromServer,解析server来的指令,具体如下:
aaudio_result_t AudioStreamInternal::onEventFromServer(AAudioServiceMessage *message) {
aaudio_result_t result = AAUDIO_OK;
switch (message->event.event) {
case AAUDIO_SERVICE_EVENT_STARTED:
ALOGD("%s - got AAUDIO_SERVICE_EVENT_STARTED", __func__);
if (getState() == AAUDIO_STREAM_STATE_STARTING) {
setState(AAUDIO_STREAM_STATE_STARTED);
}
mPlayerBase->triggerPortIdUpdate(static_cast<audio_port_handle_t>(
message->event.dataLong));
break;
case AAUDIO_SERVICE_EVENT_PAUSED:
ALOGD("%s - got AAUDIO_SERVICE_EVENT_PAUSED", __func__);
if (getState() == AAUDIO_STREAM_STATE_PAUSING) {
setState(AAUDIO_STREAM_STATE_PAUSED);
}
break;
case AAUDIO_SERVICE_EVENT_STOPPED:
ALOGD("%s - got AAUDIO_SERVICE_EVENT_STOPPED", __func__);
if (getState() == AAUDIO_STREAM_STATE_STOPPING) {
setState(AAUDIO_STREAM_STATE_STOPPED);
}
break;
case AAUDIO_SERVICE_EVENT_FLUSHED:
ALOGD("%s - got AAUDIO_SERVICE_EVENT_FLUSHED", __func__);
if (getState() == AAUDIO_STREAM_STATE_FLUSHING) {
setState(AAUDIO_STREAM_STATE_FLUSHED);
onFlushFromServer();
}
break;
case AAUDIO_SERVICE_EVENT_DISCONNECTED:
// Prevent hardware from looping on old data and making buzzing sounds.
if (getDirection() == AAUDIO_DIRECTION_OUTPUT) {
mAudioEndpoint->eraseDataMemory();
}
result = AAUDIO_ERROR_DISCONNECTED;
setDisconnected();
ALOGW("%s - AAUDIO_SERVICE_EVENT_DISCONNECTED - FIFO cleared", __func__);
break;
case AAUDIO_SERVICE_EVENT_VOLUME:
ALOGD("%s - AAUDIO_SERVICE_EVENT_VOLUME %lf", __func__, message->event.dataDouble);
mStreamVolume = (float)message->event.dataDouble;
doSetVolume();
break;
case AAUDIO_SERVICE_EVENT_XRUN:
mXRunCount = static_cast<int32_t>(message->event.dataLong);
break;
default:
ALOGE("%s - Unrecognized event = %d", __func__, (int) message->event.event);
break;
}
return result;
}
这儿就是同步server端的状态。
看到这儿需要回顾下client也就是app 和 server 同步的方式,一种有2个共享内存,一个是命令队列,一个是数据队列。
在client open 流的时候有这样一个逻辑:
result = mServiceInterface.getStreamDescription(mServiceStreamHandleInfo, mEndPointParcelable);
if (result != AAUDIO_OK) {
goto error;
}
// Resolve parcelable into a descriptor.
result = mEndPointParcelable.resolve(&mEndpointDescriptor);
if (result != AAUDIO_OK) {
goto error;
}
// Configure endpoint based on descriptor.
mAudioEndpoint = std::make_unique<AudioEndpoint>();
result = mAudioEndpoint->configure(&mEndpointDescriptor, getDirection());
从server端拿到stream description后,配置mAudioEndpoint。那接下来看下getStreamDescription包含了哪些东西, 我们先猜一下,应该就是刚才提到的2个共享内存,一个同步指令,一个同步数据:
Status AAudioService::getStreamDescription(int32_t streamHandle, Endpoint* endpoint,
int32_t *_aidl_return) {
static_assert(std::is_same_v<aaudio_result_t, std::decay_t<typeof(*_aidl_return)>>);
const sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
if (serviceStream.get() == nullptr) {
ALOGE("getStreamDescription(), illegal stream handle = 0x%0x", streamHandle);
AIDL_RETURN(AAUDIO_ERROR_INVALID_HANDLE);
}
AudioEndpointParcelable endpointParcelable;
const aaudio_result_t result = serviceStream->getDescription(endpointParcelable);
if (result == AAUDIO_OK) {
*endpoint = std::move(endpointParcelable).parcelable();
}
AIDL_RETURN(result);
}
这个是server端的逻辑,具体在service stream中:
/**
* Get an immutable description of the in-memory queues
* used to communicate with the underlying HAL or Service.
*/
aaudio_result_t AAudioServiceStreamBase::getDescription(AudioEndpointParcelable &parcelable) {
return sendCommand(
GET_DESCRIPTION,
std::make_shared<GetDescriptionParam>(&parcelable),
true /*waitForReply*/,
TIMEOUT_NANOS);
}
这儿也是发一个同步的指令,可能有人会有疑问,这都是内部调用了,为什么还搞这么复杂,直接获取信息就好了吧?通过消息队列有一个好处是可以完全避免多线程问题,在这儿只考虑生产消费者就好了。这儿就是生产指令:
aaudio_result_t AAudioServiceStreamBase::sendCommand(aaudio_command_opcode opCode,
std::shared_ptr<AAudioCommandParam> param,
bool waitForReply,
int64_t timeoutNanos) {
return mCommandQueue.sendCommand(std::make_shared<AAudioCommand>(
opCode, param, waitForReply, timeoutNanos));
}
mCommandQueue是进程内的指令队列,也支持同步指令。
service 内部的消费线程会不停地读取指令并执行,对于当前指令,对应的操作如下:
case GET_DESCRIPTION: {
auto param = (GetDescriptionParam *) command->parameter.get();
command->result = param == nullptr ? AAUDIO_ERROR_ILLEGAL_ARGUMENT
: getDescription_l(param->mParcelable);
}
接着往下看:
aaudio_result_t AAudioServiceStreamBase::getDescription_l(AudioEndpointParcelable* parcelable) {
{
std::lock_guard<std::mutex> lock(mUpMessageQueueLock);
if (mUpMessageQueue == nullptr) {
ALOGE("%s(): mUpMessageQueue null! - stream not open", __func__);
return AAUDIO_ERROR_NULL;
}
// Gather information on the message queue.
mUpMessageQueue->fillParcelable(parcelable,
parcelable->mUpMessageQueueParcelable);
}
return getAudioDataDescription_l(parcelable);
}
这儿就可以看到是包装指令信息和数据信息。
先看下包装指令信息,这儿的mUpMessageQueue 本质上是一个Ringbuffer,也就是SharedRingBuffer,里面存放指令信息,并且通过ashmem 支持进程间共享:
void SharedRingBuffer::fillParcelable(AudioEndpointParcelable* endpointParcelable,
RingBufferParcelable &ringBufferParcelable) {
int fdIndex = endpointParcelable->addFileDescriptor(mFileDescriptor, mSharedMemorySizeInBytes);
ringBufferParcelable.setupMemory(fdIndex,
SHARED_RINGBUFFER_DATA_OFFSET,
mDataMemorySizeInBytes,
SHARED_RINGBUFFER_READ_OFFSET,
SHARED_RINGBUFFER_WRITE_OFFSET,
sizeof(fifo_counter_t));
ringBufferParcelable.setBytesPerFrame(mFifoBuffer->getBytesPerFrame());
ringBufferParcelable.setFramesPerBurst(1);
ringBufferParcelable.setCapacityInFrames(mCapacityInFrames);
}
这儿关键的就是mFileDescriptor,这个是共享内存的fd信息:
aaudio_result_t SharedRingBuffer::allocate(fifo_frames_t bytesPerFrame,
fifo_frames_t capacityInFrames) {
mCapacityInFrames = capacityInFrames;
// Create shared memory large enough to hold the data and the read and write counters.
mDataMemorySizeInBytes = bytesPerFrame * capacityInFrames;
mSharedMemorySizeInBytes = mDataMemorySizeInBytes + (2 * (sizeof(fifo_counter_t)));
mFileDescriptor.reset(ashmem_create_region("AAudioSharedRingBuffer", mSharedMemorySizeInBytes));
if (mFileDescriptor.get() == -1) {
ALOGE("allocate() ashmem_create_region() failed %d", errno);
return AAUDIO_ERROR_INTERNAL;
}
ALOGV("allocate() mFileDescriptor = %d\n", mFileDescriptor.get());
int err = ashmem_set_prot_region(mFileDescriptor.get(), PROT_READ|PROT_WRITE); // TODO error handling?
if (err < 0) {
ALOGE("allocate() ashmem_set_prot_region() failed %d", errno);
mFileDescriptor.reset();
return AAUDIO_ERROR_INTERNAL; // TODO convert errno to a better AAUDIO_ERROR;
}
// Map the fd to memory addresses. Use a temporary pointer to keep the mmap result and update
// it to `mSharedMemory` only when mmap operate successfully.
auto tmpPtr = (uint8_t *) mmap(nullptr, mSharedMemorySizeInBytes,
PROT_READ|PROT_WRITE,
MAP_SHARED,
mFileDescriptor.get(), 0);
if (tmpPtr == MAP_FAILED) {
ALOGE("allocate() mmap() failed %d", errno);
mFileDescriptor.reset();
return AAUDIO_ERROR_INTERNAL; // TODO convert errno to a better AAUDIO_ERROR;
}
mSharedMemory = tmpPtr;
// Get addresses for our counters and data from the shared memory.
auto readCounterAddress = (fifo_counter_t *) &mSharedMemory[SHARED_RINGBUFFER_READ_OFFSET];
auto writeCounterAddress = (fifo_counter_t *) &mSharedMemory[SHARED_RINGBUFFER_WRITE_OFFSET];
uint8_t *dataAddress = &mSharedMemory[SHARED_RINGBUFFER_DATA_OFFSET];
mFifoBuffer = std::make_shared<FifoBufferIndirect>(bytesPerFrame, capacityInFrames,
readCounterAddress, writeCounterAddress, dataAddress);
return AAUDIO_OK;
}
这儿的逻辑可以看成是三段论:
- 先利用ashmem_create_region创建一块共享内存,在内核的实现就是利用了虚拟文件映射了一块物理内存,不同进程访问该文件实际上操作的是同一块物理内存,实现的关键就在于缺页异常的处理,后续有机会我们可以再介绍一波。
- 利用ashmem_set_prot_region设置共享内存的权限信息
- 利用mmap将该文件映射到用户态的虚拟空间中,这样本进程就可以正常访问了。其他进程拿到fd后也同样mmap一波,就也可以访问了,这就是实现共享内存的逻辑。
这儿还有一个有趣的地方,其他进程是如何拿到fd呢? 可能有人会想,fd不就是一个普通整数么?一个int ipc过去不就行了么?如果了解fd的本质的话,就会发现问题远远没有这么简单,因为fd在内核态只是进程文件打开表数组的索引,只在本进程有效,换成其他进程就不认识了,甚至可能对应了不同的文件,那就严重了。那是如何跨进程分享fd的呢?这个就是binder巧妙的地方了,实际上在binder传递fd时候,会使用特殊的标记,binder的内核驱动识别到该标记后,就会知道这个对应的是一个文件,就会利用fd找到这个文件对象,然后共享到目标进程时,就会在该进程的文件表数组中分配一个新的fd对应该文件对象,这样目标进程就得到了分享之后的fd,虽然这个fd 数值可能甚至大概率会和原始进程的fd不一样,可是对应的是同一个文件对象,这样就可以放心共享文件了。
接下来再看包装数据信息,这儿会涉及到共享模式和独占模式的首个区别,先看共享模式吧:
/**
* Get an immutable description of the data queue created by this service.
*/
aaudio_result_t AAudioServiceStreamShared::getAudioDataDescription_l(
AudioEndpointParcelable* parcelable)
{
std::lock_guard<std::mutex> lock(audioDataQueueLock);
if (mAudioDataQueue == nullptr) {
ALOGW("%s(): mUpMessageQueue null! - stream not open", __func__);
return AAUDIO_ERROR_NULL;
}
// Gather information on the data queue.
mAudioDataQueue->fillParcelable(parcelable,
parcelable->mDownDataQueueParcelable);
parcelable->mDownDataQueueParcelable.setFramesPerBurst(getFramesPerBurst());
return AAUDIO_OK;
}
这儿的mAudioDataQueue是在打开共享模式时候创建的:
{
std::lock_guard<std::mutex> lock(audioDataQueueLock);
// Create audio data shared memory buffer for client.
mAudioDataQueue = std::make_shared<SharedRingBuffer>();
result = mAudioDataQueue->allocate(calculateBytesPerFrame(), getBufferCapacity());
if (result != AAUDIO_OK) {
ALOGE("%s() could not allocate FIFO with %d frames",
__func__, getBufferCapacity());
result = AAUDIO_ERROR_NO_MEMORY;
goto error;
}
}
可以看到也是一个SharedRingBuffer, 那接下来我们就知道了共享模式本质上的共享内存是在aaudioserver中。
那接下来看下独占模式的:
// Get an immutable description of the data queue from the HAL.
aaudio_result_t AAudioServiceStreamMMAP::getAudioDataDescription_l(
AudioEndpointParcelable* parcelable)
{
sp<AAudioServiceEndpoint> endpoint = mServiceEndpointWeak.promote();
if (endpoint == nullptr) {
ALOGE("%s() has no endpoint", __func__);
return AAUDIO_ERROR_INVALID_STATE;
}
sp<AAudioServiceEndpointMMAP> serviceEndpointMMAP =
static_cast<AAudioServiceEndpointMMAP *>(endpoint.get());
return serviceEndpointMMAP->getDownDataDescription(parcelable);
}
可以看到这儿并不是在aaudioserver中创建共享内存,会有些区别,我们跟一下看看:
/**
* Get an immutable description of the data queue from the HAL.
*/
aaudio_result_t AAudioServiceEndpointMMAP::getDownDataDescription(
AudioEndpointParcelable* parcelable)
{
if (mAudioDataWrapper->setupFifoBuffer(calculateBytesPerFrame(), getBufferCapacity())
!= AAUDIO_OK) {
ALOGE("Failed to setup audio data wrapper, will not be able to "
"set data for sound dose computation");
// This will not affect the audio processing capability
}
// Gather information on the data queue based on HAL info.
mAudioDataWrapper->fillParcelable(parcelable, parcelable->mDownDataQueueParcelable,
calculateBytesPerFrame(), mFramesPerBurst,
getBufferCapacity(),
getDirection() == AAUDIO_DIRECTION_OUTPUT
? SharedMemoryWrapper::WRITE
: SharedMemoryWrapper::NONE);
return AAUDIO_OK;
}
这儿的mAudioDataWrapper是在打开mmap时候创建的:
aaudio_result_t AAudioServiceEndpointMMAP::open(const aaudio::AAudioStreamRequest &request) {
aaudio_result_t result = AAUDIO_OK;
mAudioDataWrapper = std::make_unique<SharedMemoryWrapper>();
copyFrom(request.getConstantConfiguration());
mRequestedDeviceId = getDeviceId();
在从hal层创建完恭喜那个内存后,会将拿到的共享内存fd赋值过去:
aaudio_result_t AAudioServiceEndpointMMAP::createMmapBuffer()
{
memset(&mMmapBufferinfo, 0, sizeof(struct audio_mmap_buffer_info));
int32_t minSizeFrames = getBufferCapacity();
if (minSizeFrames <= 0) { // zero will get rejected
minSizeFrames = AAUDIO_BUFFER_CAPACITY_MIN;
}
const status_t status = mMmapStream->createMmapBuffer(minSizeFrames, &mMmapBufferinfo);
const bool isBufferShareable = mMmapBufferinfo.flags & AUDIO_MMAP_APPLICATION_SHAREABLE;
if (status != OK) {
ALOGE("%s() - createMmapBuffer() failed with status %d %s",
__func__, status, strerror(-status));
return AAUDIO_ERROR_UNAVAILABLE;
} else {
ALOGD("%s() createMmapBuffer() buffer_size = %d fr, burst_size %d fr"
", Sharable FD: %s",
__func__,
mMmapBufferinfo.buffer_size_frames,
mMmapBufferinfo.burst_size_frames,
isBufferShareable ? "Yes" : "No");
}
setBufferCapacity(mMmapBufferinfo.buffer_size_frames);
if (!isBufferShareable) {
// Exclusive mode can only be used by the service because the FD cannot be shared.
const int32_t audioServiceUid =
VALUE_OR_FATAL(legacy2aidl_uid_t_int32_t(getuid()));
if ((mMmapClient.attributionSource.uid != audioServiceUid) &&
getSharingMode() == AAUDIO_SHARING_MODE_EXCLUSIVE) {
ALOGW("%s() - exclusive FD cannot be used by client", __func__);
return AAUDIO_ERROR_UNAVAILABLE;
}
}
// AAudio creates a copy of this FD and retains ownership of the copy.
// Assume that AudioFlinger will close the original shared_memory_fd.
mAudioDataWrapper->getDataFileDescriptor().reset(dup(mMmapBufferinfo.shared_memory_fd));
if (mAudioDataWrapper->getDataFileDescriptor().get() == -1) {
ALOGE("%s() - could not dup shared_memory_fd", __func__);
return AAUDIO_ERROR_INTERNAL;
}
// Call to HAL to make sure the transport FD was able to be closed by binder.
// This is a tricky workaround for a problem in Binder.
// TODO:[b/192048842] When that problem is fixed we may be able to remove or change this code.
struct audio_mmap_position position;
mMmapStream->getMmapPosition(&position);
mFramesPerBurst = mMmapBufferinfo.burst_size_frames;
return AAUDIO_OK;
}
mAudioDataWrapper 是SharedMemoryWrapper,在初始化的时候也会创建一块共享内存:
constexpr int COUNTER_SIZE_IN_BYTES = sizeof(android::fifo_counter_t);
constexpr int WRAPPER_SIZE_IN_BYTES = 2 * COUNTER_SIZE_IN_BYTES;
SharedMemoryWrapper::SharedMemoryWrapper() {
mCounterFd.reset(ashmem_create_region("AAudioSharedMemoryWrapper", WRAPPER_SIZE_IN_BYTES));
if (mCounterFd.get() == -1) {
ALOGE("allocate() ashmem_create_region() failed %d", errno);
return;
}
int err = ashmem_set_prot_region(mCounterFd.get(), PROT_READ|PROT_WRITE);
if (err < 0) {
ALOGE("allocate() ashmem_set_prot_region() failed %d", errno);
mCounterFd.reset();
return;
}
auto tmpPtr = (uint8_t *) mmap(nullptr, WRAPPER_SIZE_IN_BYTES,
PROT_READ|PROT_WRITE,
MAP_SHARED,
mCounterFd.get(), 0);
if (tmpPtr == MAP_FAILED) {
ALOGE("allocate() mmap() failed %d", errno);
mCounterFd.reset();
return;
}
mCounterMemoryAddress = tmpPtr;
mReadCounterAddress = (android::fifo_counter_t*) mCounterMemoryAddress;
mWriteCounterAddress = (android::fifo_counter_t*) &mCounterMemoryAddress[COUNTER_SIZE_IN_BYTES];
}
不过这儿并不包含数据信息,可以看到这块共享内存的大小仅仅是2个fifo_counter_t, 也就是存放读写的位置信息。很容易想到,那数据就应该是hal中返回给我们的了。
aaudio_result_t SharedMemoryWrapper::setupFifoBuffer(android::fifo_frames_t bytesPerFrame,
android::fifo_frames_t capacityInFrames) {
if (mDataFd.get() == -1) {
ALOGE("%s data file descriptor is not initialized", __func__);
return AAUDIO_ERROR_INTERNAL;
}
if (mCounterMemoryAddress == nullptr) {
ALOGE("%s the counter memory is not allocated correctly", __func__);
return AAUDIO_ERROR_INTERNAL;
}
mSharedMemorySizeInBytes = bytesPerFrame * capacityInFrames;
auto tmpPtr = (uint8_t *) mmap(nullptr, mSharedMemorySizeInBytes,
PROT_READ|PROT_WRITE,
MAP_SHARED,
mDataFd.get(), 0);
if (tmpPtr == MAP_FAILED) {
ALOGE("allocate() mmap() failed %d", errno);
return AAUDIO_ERROR_INTERNAL;
}
mSharedMemory = tmpPtr;
mFifoBuffer = std::make_shared<android::FifoBufferIndirect>(
bytesPerFrame, capacityInFrames, mReadCounterAddress,
mWriteCounterAddress, mSharedMemory);
return AAUDIO_OK;
}
mDataFd中存放的就是hal中返回的共享内存fd。接下来就是先共享内存,映射到用户态逻辑地址中。
接下来我们也能想到,就需要把位置和数据对应的共享内存信息parcel起来了:
void SharedMemoryWrapper::fillParcelable(
AudioEndpointParcelable* endpointParcelable, RingBufferParcelable &ringBufferParcelable,
int32_t bytesPerFrame, int32_t framesPerBurst, int32_t capacityInFrames,
CounterFilling counterFilling) {
const int capacityInBytes = bytesPerFrame * capacityInFrames;
const int dataFdIndex =
endpointParcelable->addFileDescriptor(mDataFd, mSharedMemorySizeInBytes);
ringBufferParcelable.setBytesPerFrame(bytesPerFrame);
ringBufferParcelable.setFramesPerBurst(framesPerBurst);
ringBufferParcelable.setCapacityInFrames(capacityInFrames);
if (mCounterFd.get() == -1 || counterFilling == NONE) {
// Failed to create shared memory for read/write counter or requesting no filling counters.
ALOGD("%s no counter is filled, counterFd=%d", __func__, mCounterFd.get());
ringBufferParcelable.setupMemory(dataFdIndex, 0, capacityInBytes);
} else {
int counterFdIndex =
endpointParcelable->addFileDescriptor(mCounterFd, WRAPPER_SIZE_IN_BYTES);
const int readCounterSize = (counterFilling & READ) == NONE ? 0 : COUNTER_SIZE_IN_BYTES;
const int writeCounterSize = (counterFilling & WRITE) == NONE ? 0 : COUNTER_SIZE_IN_BYTES;
ALOGD("%s counterFdIndex=%d readCounterSize=%d, writeCounterSize=%d",
__func__, counterFdIndex, readCounterSize, writeCounterSize);
ringBufferParcelable.setupMemory(
{dataFdIndex, 0 /*offset*/, capacityInBytes},
{counterFdIndex, 0 /*offset*/, readCounterSize},
{counterFdIndex, COUNTER_SIZE_IN_BYTES, writeCounterSize});
}
}
果然和我们的猜想一致。这儿就能得到共享模式和独占模式的一个关键差异,共享模式还需要在系统服务中共享一块内存,作为hal层共享内存的中间代理,而独占模式就把中间商干掉了,这时候就很容易理解为什么独占模式延时更低了吧。
接下来回到AudioStreamInternal::requestStart_l(), 第二个逻辑是prepareBuffersForStart, 我们可以盲猜一下,应该是清理下内存,这儿采集就是空实现,应该是认为采集不需要,播放会主动清理下:
void AudioStreamInternalPlay::prepareBuffersForStart() {
// Prevent stale data from being played.
mAudioEndpoint->eraseDataMemory();
}
继续往下看:
void AudioEndpoint::eraseDataMemory() {
if (mDataQueue != nullptr) {
mDataQueue->eraseMemory();
}
}
mDataQueue 就是Fifo buffer,我们可以看下实现:
void FifoBuffer::eraseMemory() {
int32_t numBytes = convertFramesToBytes(getBufferCapacityInFrames());
if (numBytes > 0) {
memset(getStorage(), 0, (size_t) numBytes);
}
}
这儿的getStorage 其实就是清理共享buffer。
接下来继续看向server端请求startStream:
Status AAudioService::startStream(int32_t streamHandle, int32_t *_aidl_return) {
static_assert(std::is_same_v<aaudio_result_t, std::decay_t<typeof(*_aidl_return)>>);
const sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
if (serviceStream.get() == nullptr) {
ALOGW("%s(), invalid streamHandle = 0x%0x", __func__, streamHandle);
AIDL_RETURN(AAUDIO_ERROR_INVALID_HANDLE);
}
AIDL_RETURN(serviceStream->start());
}
这儿就是请求内部的流执行start了:
aaudio_result_t AAudioServiceStreamBase::start() {
return sendCommand(START, nullptr, true /*waitForReply*/, TIMEOUT_NANOS);
}
此时请求就变成抛一个指令,变成异步形式了, 在steam的内部命令线程收到START指令后,会执行如下操作
switch (command->operationCode) {
case START:
command->result = start_l();
timestampScheduler.setBurstPeriod(mFramesPerBurst, getSampleRate());
timestampScheduler.start(AudioClock::getNanoseconds());
nextTimestampReportTime = timestampScheduler.nextAbsoluteTime();
nextDataReportTime = nextDataReportTime_l();
break;
我们先看下start_l:
aaudio_result_t AAudioServiceStreamBase::start_l() {
const int64_t beginNs = AudioClock::getNanoseconds();
aaudio_result_t result = AAUDIO_OK;
if (auto state = getState();
state == AAUDIO_STREAM_STATE_CLOSED || isDisconnected_l()) {
ALOGW("%s() already CLOSED, returns INVALID_STATE, handle = %d",
__func__, getHandle());
return AAUDIO_ERROR_INVALID_STATE;
}
if (mStandby) {
ALOGW("%s() the stream is standby, return ERROR_STANDBY, "
"expecting the client call exitStandby before start", __func__);
return AAUDIO_ERROR_STANDBY;
}
mediametrics::Defer defer([&] {
mediametrics::LogItem(mMetricsId)
.set(AMEDIAMETRICS_PROP_EVENT, AMEDIAMETRICS_PROP_EVENT_VALUE_START)
.set(AMEDIAMETRICS_PROP_EXECUTIONTIMENS, (int64_t)(AudioClock::getNanoseconds() - beginNs))
.set(AMEDIAMETRICS_PROP_STATE, AudioGlobal_convertStreamStateToText(getState()))
.set(AMEDIAMETRICS_PROP_STATUS, (int32_t)result)
.record(); });
if (isRunning()) {
return result;
}
setFlowing(false);
setSuspended(false);
// Start with fresh presentation timestamps.
mAtomicStreamTimestamp.clear();
mClientHandle = AUDIO_PORT_HANDLE_NONE;
result = startDevice();
if (result != AAUDIO_OK) goto error;
// This should happen at the end of the start.
sendServiceEvent(AAUDIO_SERVICE_EVENT_STARTED, static_cast<int64_t>(mClientHandle));
setState(AAUDIO_STREAM_STATE_STARTED);
return result;
error:
disconnect_l();
return result;
}
这儿的逻辑要点如下:
- 状态检查,如果已经是关闭状态,那就返回失败,如果是待机模式,也返回对应错误码,通知调用方先退出待机再启动;
- 更新内部状态,比如mFlowing表示是否收到了首帧,mSuspended表示是否buffer满了;
- 继续向底层传递startDevice指令,让底层的流真正启动起来;
- 通知应用状态变更
介绍下待机模式,就是在创建好mmap通道后,关闭流不销毁mmap通道,这样后续使用mmap通道就可以不需要再次创建了,可以在反复启动采播场景节省功耗。
这儿主要的就是startDevice了,接下来看下这个流程:
aaudio_result_t AAudioServiceStreamBase::startDevice() {
mClientHandle = AUDIO_PORT_HANDLE_NONE;
sp<AAudioServiceEndpoint> endpoint = mServiceEndpointWeak.promote();
if (endpoint == nullptr) {
ALOGE("%s() has no endpoint", __func__);
return AAUDIO_ERROR_INVALID_STATE;
}
return endpoint->startStream(this, &mClientHandle);
}
继续跟入AAudioServiceEndpoint看下, 这时候需要区分共享模式和独占模式,先看下独占模式:
aaudio_result_t AAudioServiceEndpointMMAP::startStream(sp<AAudioServiceStreamBase> stream,
audio_port_handle_t *clientHandle __unused) {
// Start the client on behalf of the AAudio service.
// Use the port handle that was provided by openMmapStream().
audio_port_handle_t tempHandle = mPortHandle;
audio_attributes_t attr = {};
if (stream != nullptr) {
attr = getAudioAttributesFrom(stream.get());
}
const aaudio_result_t result = startClient(
mMmapClient, stream == nullptr ? nullptr : &attr, &tempHandle);
// When AudioFlinger is passed a valid port handle then it should not change it.
LOG_ALWAYS_FATAL_IF(tempHandle != mPortHandle,
"%s() port handle not expected to change from %d to %d",
__func__, mPortHandle, tempHandle);
ALOGV("%s() mPortHandle = %d", __func__, mPortHandle);
return result;
}
这儿主要就是调用了下startClient,继续跟入看下:
aaudio_result_t AAudioServiceEndpointMMAP::startClient(const android::AudioClient& client,
const audio_attributes_t *attr,
audio_port_handle_t *clientHandle) {
return mMmapStream == nullptr
? AAUDIO_ERROR_NULL
: AAudioConvert_androidToAAudioResult(mMmapStream->start(client, attr, clientHandle));
}
这儿的mMmapStream 就是MmapStreamInterface, 可以想到又会走到hal层进行通知,我们跟着看一下,首先在AudioFlinger中:
status_t MmapThreadHandle::start(const AudioClient& client,
const audio_attributes_t *attr, audio_port_handle_t *handle)
{
return mThread->start(client, attr, handle);
}
mThread就是MmapThread:
status_t MmapThread::start(const AudioClient& client,
const audio_attributes_t *attr,
audio_port_handle_t *handle)
{
audio_utils::lock_guard l(mutex());
ALOGV("%s clientUid %d mStandby %d mPortId %d *handle %d", __FUNCTION__,
client.attributionSource.uid, mStandby, mPortId, *handle);
if (mHalStream == 0) {
return NO_INIT;
}
status_t ret;
// For the first track, reuse portId and session allocated when the stream was opened.
if (*handle == mPortId) {
acquireWakeLock_l();
return NO_ERROR;
}
audio_port_handle_t portId = AUDIO_PORT_HANDLE_NONE;
audio_io_handle_t io = mId;
const AttributionSourceState adjAttributionSource = afutils::checkAttributionSourcePackage(
client.attributionSource);
const auto localSessionId = mSessionId;
auto localAttr = mAttr;
if (isOutput()) {
audio_config_t config = AUDIO_CONFIG_INITIALIZER;
config.sample_rate = mSampleRate;
config.channel_mask = mChannelMask;
config.format = mFormat;
audio_stream_type_t stream = streamType_l();
audio_output_flags_t flags =
(audio_output_flags_t)(AUDIO_OUTPUT_FLAG_MMAP_NOIRQ | AUDIO_OUTPUT_FLAG_DIRECT);
audio_port_handle_t deviceId = mDeviceId;
std::vector<audio_io_handle_t> secondaryOutputs;
bool isSpatialized;
bool isBitPerfect;
mutex().unlock();
ret = AudioSystem::getOutputForAttr(&localAttr, &io,
localSessionId,
&stream,
adjAttributionSource,
&config,
flags,
&deviceId,
&portId,
&secondaryOutputs,
&isSpatialized,
&isBitPerfect);
mutex().lock();
mAttr = localAttr;
ALOGD_IF(!secondaryOutputs.empty(),
"MmapThread::start does not support secondary outputs, ignoring them");
} else {
audio_config_base_t config;
config.sample_rate = mSampleRate;
config.channel_mask = mChannelMask;
config.format = mFormat;
audio_port_handle_t deviceId = mDeviceId;
mutex().unlock();
ret = AudioSystem::getInputForAttr(&localAttr, &io,
RECORD_RIID_INVALID,
localSessionId,
adjAttributionSource,
&config,
AUDIO_INPUT_FLAG_MMAP_NOIRQ,
&deviceId,
&portId);
mutex().lock();
// localAttr is const for getInputForAttr.
}
// APM should not chose a different input or output stream for the same set of attributes
// and audo configuration
if (ret != NO_ERROR || io != mId) {
ALOGE("%s: error getting output or input from APM (error %d, io %d expected io %d)",
__FUNCTION__, ret, io, mId);
return BAD_VALUE;
}
if (isOutput()) {
mutex().unlock();
ret = AudioSystem::startOutput(portId);
mutex().lock();
} else {
{
// Add the track record before starting input so that the silent status for the
// client can be cached.
setClientSilencedState_l(portId, false /*silenced*/);
}
mutex().unlock();
ret = AudioSystem::startInput(portId);
mutex().lock();
}
// abort if start is rejected by audio policy manager
if (ret != NO_ERROR) {
ALOGE("%s: error start rejected by AudioPolicyManager = %d", __FUNCTION__, ret);
if (!mActiveTracks.isEmpty()) {
mutex().unlock();
if (isOutput()) {
AudioSystem::releaseOutput(portId);
} else {
AudioSystem::releaseInput(portId);
}
mutex().lock();
} else {
mHalStream->stop();
}
eraseClientSilencedState_l(portId);
return PERMISSION_DENIED;
}
// Given that MmapThread::mAttr is mutable, should a MmapTrack have attributes ?
sp<IAfMmapTrack> track = IAfMmapTrack::create(
this, attr == nullptr ? mAttr : *attr, mSampleRate, mFormat,
mChannelMask, mSessionId, isOutput(),
client.attributionSource,
IPCThreadState::self()->getCallingPid(), portId);
if (!isOutput()) {
track->setSilenced_l(isClientSilenced_l(portId));
}
if (isOutput()) {
// force volume update when a new track is added
mHalVolFloat = -1.0f;
} else if (!track->isSilenced_l()) {
for (const sp<IAfMmapTrack>& t : mActiveTracks) {
if (t->isSilenced_l()
&& t->uid() != static_cast<uid_t>(client.attributionSource.uid)) {
t->invalidate();
}
}
}
mActiveTracks.add(track);
sp<IAfEffectChain> chain = getEffectChain_l(mSessionId);
if (chain != 0) {
chain->setStrategy(getStrategyForStream(streamType_l()));
chain->incTrackCnt();
chain->incActiveTrackCnt();
}
track->logBeginInterval(patchSinksToString(&mPatch)); // log to MediaMetrics
*handle = portId;
if (mActiveTracks.size() == 1) {
ret = exitStandby_l();
}
broadcast_l();
ALOGV("%s DONE status %d handle %d stream %p", __FUNCTION__, ret, *handle, mHalStream.get());
return ret;
}
关键逻辑如下:
- 从AudioSystem中获取属性信息,这块最终会到AudioPolicyService中查询,AudioPolicyService 是从系统配置信息中获取到的
- 通知AudioSystem采播状态, 我理解这儿也是为了更新状态,并不会操作hal
- 创建mmap track
- 通知hal层启动
这儿可能会有疑问,既然mmap是buffer共享,那就不需要在audioflinger这儿再cache 一次了,也就是不需要mmap track了,这儿还是创建了,为什么?在回答这个问题前,可以先简单看下mmap track的实现:
status_t MmapTrack::start(AudioSystem::sync_event_t event __unused,
audio_session_t triggerSession __unused)
{
return NO_ERROR;
}
void MmapTrack::stop()
{
}
// AudioBufferProvider interface
status_t MmapTrack::getNextBuffer(AudioBufferProvider::Buffer* buffer)
{
buffer->frameCount = 0;
buffer->raw = nullptr;
return INVALID_OPERATION;
}
// ExtendedAudioBufferProvider interface
size_t MmapTrack::framesReady() const {
return 0;
}
int64_t MmapTrack::framesReleased() const
{
return 0;
}
void MmapTrack::onTimestamp(const ExtendedTimestamp& timestamp __unused)
{
}
全是空实现,也就是这儿仅仅是为了让在audioflinger中呈现一个记录,这样我们通过dump audioflinger 信息就可以看到系统中所有的采播信息了。
当然这儿也不全部都是空实现,也会有silence信息记录。这样就可以实现在接听电话时候,aaudio的播放音量自然变成0,采集也一样。
不过MmapTrack也不完全是空实现,也有部分是有内容的:
void MmapTrack::processMuteEvent_l(const sp<IAudioManager>& audioManager, mute_state_t muteState)
{
if (mMuteState == muteState) {
// mute state did not change, do nothing
return;
}
status_t result = UNKNOWN_ERROR;
if (audioManager && mPortId != AUDIO_PORT_HANDLE_NONE) {
if (mMuteEventExtras == nullptr) {
mMuteEventExtras = std::make_unique<os::PersistableBundle>();
}
mMuteEventExtras->putInt(String16(kExtraPlayerEventMuteKey),
static_cast<int>(muteState));
result = audioManager->portEvent(mPortId,
PLAYER_UPDATE_MUTED,
mMuteEventExtras);
}
if (result == OK) {
ALOGI("%s(%d): processed mute state for port ID %d from %d to %d", __func__, id(), mPortId,
static_cast<int>(mMuteState), static_cast<int>(muteState));
mMuteState = muteState;
} else {
ALOGW("%s(%d): cannot process mute state for port ID %d, status error %d",
__func__,
id(),
mPortId,
result);
}
}
这儿的AudioManager对应的就是Java中的AudioService,这样就能让AudioSerivce中负责记录采播状态的各种Monitor检测到,然后应用就可以感知到这些变化信息了。
我们接下来继续看hal层是如何启动的:
status_t MmapThread::exitStandby_l()
{
// The HAL must receive track metadata before starting the stream
updateMetadata_l();
status_t ret = mHalStream->start();
if (ret != NO_ERROR) {
ALOGE("%s: error mHalStream->start() = %d for first track", __FUNCTION__, ret);
return ret;
}
if (mStandby) {
mThreadMetrics.logBeginInterval();
mThreadSnapshot.onBegin();
mStandby = false;
}
return NO_ERROR;
}
启动mmap 对应的就是退出standby模式,到了这儿就终于看到了操作hal了。
status_t StreamHalAidl::start() {
ALOGD("%p %s::%s", this, getClassName().c_str(), __func__);
TIME_CHECK();
if (!mStream) return NO_INIT;
if (!mContext.isMmapped()) {
return BAD_VALUE;
}
StreamDescriptor::Reply reply;
RETURN_STATUS_IF_ERROR(updateCountersIfNeeded(&reply));
switch (reply.state) {
case StreamDescriptor::State::STANDBY:
RETURN_STATUS_IF_ERROR(
sendCommand(makeHalCommand<HalCommand::Tag::start>(), &reply, true));
if (reply.state != StreamDescriptor::State::IDLE) {
ALOGE("%s: unexpected stream state: %s (expected IDLE)",
__func__, toString(reply.state).c_str());
return INVALID_OPERATION;
}
FALLTHROUGH_INTENDED;
case StreamDescriptor::State::IDLE:
RETURN_STATUS_IF_ERROR(
sendCommand(makeHalCommand<HalCommand::Tag::burst>(0), &reply, true));
if (reply.state != StreamDescriptor::State::ACTIVE) {
ALOGE("%s: unexpected stream state: %s (expected ACTIVE)",
__func__, toString(reply.state).c_str());
return INVALID_OPERATION;
}
FALLTHROUGH_INTENDED;
case StreamDescriptor::State::ACTIVE:
return OK;
case StreamDescriptor::State::DRAINING:
RETURN_STATUS_IF_ERROR(
sendCommand(makeHalCommand<HalCommand::Tag::start>(), &reply, true));
if (reply.state != StreamDescriptor::State::ACTIVE) {
ALOGE("%s: unexpected stream state: %s (expected ACTIVE)",
__func__, toString(reply.state).c_str());
return INVALID_OPERATION;
}
return OK;
default:
ALOGE("%s: not supported from %s stream state %s",
__func__, mIsInput ? "input" : "output", toString(reply.state).c_str());
return INVALID_OPERATION;
}
}
从这儿看到又是一个命令模式,在不同状态机下发送一个start命令:
status_t StreamHalAidl::sendCommand(
const ::aidl::android::hardware::audio::core::StreamDescriptor::Command& command,
::aidl::android::hardware::audio::core::StreamDescriptor::Reply* reply,
bool safeFromNonWorkerThread, StatePositions* statePositions) {
// TIME_CHECK(); // TODO(b/243839867) reenable only when optimized.
if (!safeFromNonWorkerThread) {
const pid_t workerTid = mWorkerTid.load(std::memory_order_acquire);
LOG_ALWAYS_FATAL_IF(workerTid != gettid(),
"%s %s: must be invoked from the worker thread (%d)",
__func__, command.toString().c_str(), workerTid);
}
StreamDescriptor::Reply localReply{};
{
std::lock_guard l(mCommandReplyLock);
if (!mContext.getCommandMQ()->writeBlocking(&command, 1)) {
ALOGE("%s: failed to write command %s to MQ", __func__, command.toString().c_str());
return NOT_ENOUGH_DATA;
}
if (reply == nullptr) {
reply = &localReply;
}
if (!mContext.getReplyMQ()->readBlocking(reply, 1)) {
ALOGE("%s: failed to read from reply MQ, command %s",
__func__, command.toString().c_str());
return NOT_ENOUGH_DATA;
}
{
std::lock_guard l(mLock);
// Not every command replies with 'latencyMs' field filled out, substitute the last
// returned value in that case.
if (reply->latencyMs <= 0) {
reply->latencyMs = mLastReply.latencyMs;
}
mLastReply = *reply;
mLastReplyExpirationNs = uptimeNanos() + mLastReplyLifeTimeNs;
if (!mIsInput && reply->status == STATUS_OK) {
if (command.getTag() == StreamDescriptor::Command::standby &&
reply->state == StreamDescriptor::State::STANDBY) {
mStatePositions.framesAtStandby = reply->observable.frames;
} else if (command.getTag() == StreamDescriptor::Command::flush &&
reply->state == StreamDescriptor::State::IDLE) {
mStatePositions.framesAtFlushOrDrain = reply->observable.frames;
} else if (!mContext.isAsynchronous() &&
command.getTag() == StreamDescriptor::Command::drain &&
(reply->state == StreamDescriptor::State::IDLE ||
reply->state == StreamDescriptor::State::DRAINING)) {
mStatePositions.framesAtFlushOrDrain = reply->observable.frames;
} // for asynchronous drain, the frame count is saved in 'onAsyncDrainReady'
}
if (statePositions != nullptr) {
*statePositions = mStatePositions;
}
}
}
switch (reply->status) {
case STATUS_OK: return OK;
case STATUS_BAD_VALUE: return BAD_VALUE;
case STATUS_INVALID_OPERATION: return INVALID_OPERATION;
case STATUS_NOT_ENOUGH_DATA: return NOT_ENOUGH_DATA;
default:
ALOGE("%s: unexpected status %d returned for command %s",
__func__, reply->status, command.toString().c_str());
return INVALID_OPERATION;
}
}
从代码上看这儿就是把指令写到了指令队列中,同时读取回复。 那是哪儿会执行这个指令呢?只有hal了, 这儿以采集为例子:
StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
// Note: for input streams, draining is driven by the client, thus
// "empty buffer" condition can only happen while handling the 'burst'
// command. Thus, unlike for output streams, it does not make sense to
// delay the 'DRAINING' state here by 'mTransientStateDelayMs'.
// TODO: Add a delay for transitions of async operations when/if they added.
StreamDescriptor::Command command{};
if (!mContext->getCommandMQ()->readBlocking(&command, 1)) {
LOG(ERROR) << __func__ << ": reading of command from MQ failed";
mState = StreamDescriptor::State::ERROR;
return Status::ABORT;
}
using Tag = StreamDescriptor::Command::Tag;
using LogSeverity = ::android::base::LogSeverity;
const LogSeverity severity =
command.getTag() == Tag::burst || command.getTag() == Tag::getStatus
? LogSeverity::VERBOSE
: LogSeverity::DEBUG;
LOG(severity) << __func__ << ": received command " << command.toString() << " in "
<< kThreadName;
StreamDescriptor::Reply reply{};
reply.status = STATUS_BAD_VALUE;
switch (command.getTag()) {
case Tag::halReservedExit: {
const int32_t cookie = command.get<Tag::halReservedExit>();
StreamInWorkerLogic::Status status = Status::CONTINUE;
if (cookie == (mContext->getInternalCommandCookie() ^ getTid())) {
mDriver->shutdown();
setClosed();
status = Status::EXIT;
} else {
LOG(WARNING) << __func__ << ": EXIT command has a bad cookie: " << cookie;
}
if (cookie != 0) { // This is an internal command, no need to reply.
return status;
}
// `cookie == 0` can only occur in the context of a VTS test, need to reply.
break;
}
case Tag::getStatus:
populateReply(&reply, mIsConnected);
break;
case Tag::start:
if (mState == StreamDescriptor::State::STANDBY ||
mState == StreamDescriptor::State::DRAINING) {
if (::android::status_t status = mDriver->start(); status == ::android::OK) {
populateReply(&reply, mIsConnected);
mState = mState == StreamDescriptor::State::STANDBY
? StreamDescriptor::State::IDLE
: StreamDescriptor::State::ACTIVE;
} else {
LOG(ERROR) << __func__ << ": start failed: " << status;
mState = StreamDescriptor::State::ERROR;
}
} else {
populateReplyWrongState(&reply, command);
}
break;
case Tag::burst:
if (const int32_t fmqByteCount = command.get<Tag::burst>(); fmqByteCount >= 0) {
LOG(VERBOSE) << __func__ << ": '" << toString(command.getTag()) << "' command for "
<< fmqByteCount << " bytes";
if (mState == StreamDescriptor::State::IDLE ||
mState == StreamDescriptor::State::ACTIVE ||
mState == StreamDescriptor::State::PAUSED ||
mState == StreamDescriptor::State::DRAINING) {
if (!read(fmqByteCount, &reply)) {
mState = StreamDescriptor::State::ERROR;
}
if (mState == StreamDescriptor::State::IDLE ||
mState == StreamDescriptor::State::PAUSED) {
mState = StreamDescriptor::State::ACTIVE;
} else if (mState == StreamDescriptor::State::DRAINING) {
// To simplify the reference code, we assume that the read operation
// has consumed all the data remaining in the hardware buffer.
// In a real implementation, here we would either remain in
// the 'DRAINING' state, or transfer to 'STANDBY' depending on the
// buffer state.
mState = StreamDescriptor::State::STANDBY;
}
} else {
populateReplyWrongState(&reply, command);
}
} else {
LOG(WARNING) << __func__ << ": invalid burst byte count: " << fmqByteCount;
}
break;
case Tag::drain:
if (const auto mode = command.get<Tag::drain>();
mode == StreamDescriptor::DrainMode::DRAIN_UNSPECIFIED) {
if (mState == StreamDescriptor::State::ACTIVE) {
if (::android::status_t status = mDriver->drain(mode);
status == ::android::OK) {
populateReply(&reply, mIsConnected);
mState = StreamDescriptor::State::DRAINING;
} else {
LOG(ERROR) << __func__ << ": drain failed: " << status;
mState = StreamDescriptor::State::ERROR;
}
} else {
populateReplyWrongState(&reply, command);
}
} else {
LOG(WARNING) << __func__ << ": invalid drain mode: " << toString(mode);
}
break;
case Tag::standby:
if (mState == StreamDescriptor::State::IDLE) {
populateReply(&reply, mIsConnected);
if (::android::status_t status = mDriver->standby(); status == ::android::OK) {
mState = StreamDescriptor::State::STANDBY;
} else {
LOG(ERROR) << __func__ << ": standby failed: " << status;
mState = StreamDescriptor::State::ERROR;
}
} else {
populateReplyWrongState(&reply, command);
}
break;
case Tag::pause:
if (mState == StreamDescriptor::State::ACTIVE) {
if (::android::status_t status = mDriver->pause(); status == ::android::OK) {
populateReply(&reply, mIsConnected);
mState = StreamDescriptor::State::PAUSED;
} else {
LOG(ERROR) << __func__ << ": pause failed: " << status;
mState = StreamDescriptor::State::ERROR;
}
} else {
populateReplyWrongState(&reply, command);
}
break;
case Tag::flush:
if (mState == StreamDescriptor::State::PAUSED) {
if (::android::status_t status = mDriver->flush(); status == ::android::OK) {
populateReply(&reply, mIsConnected);
mState = StreamDescriptor::State::STANDBY;
} else {
LOG(ERROR) << __func__ << ": flush failed: " << status;
mState = StreamDescriptor::State::ERROR;
}
} else {
populateReplyWrongState(&reply, command);
}
break;
}
reply.state = mState;
LOG(severity) << __func__ << ": writing reply " << reply.toString();
if (!mContext->getReplyMQ()->writeBlocking(&reply, 1)) {
LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
mState = StreamDescriptor::State::ERROR;
return Status::ABORT;
}
return Status::CONTINUE;
}
这儿就是按照指令解释,我们看到对应start,核心的就是执行mDriver->start()
, 这儿的mDriver是DriverInterface,我们接下来看下如何执行的start, 可以想到这儿的Driver就是alsa:
::android::status_t StreamAlsa::start() {
if (!mAlsaDeviceProxies.empty()) {
// This is a resume after a pause.
return ::android::OK;
}
decltype(mAlsaDeviceProxies) alsaDeviceProxies;
for (const auto& device : getDeviceProfiles()) {
alsa::DeviceProxy proxy;
if (device.isExternal) {
// Always ask alsa configure as required since the configuration should be supported
// by the connected device. That is guaranteed by `setAudioPortConfig` and
// `setAudioPatch`.
proxy = alsa::openProxyForExternalDevice(
device, const_cast<struct pcm_config*>(&mConfig.value()),
true /*require_exact_match*/);
} else {
proxy = alsa::openProxyForAttachedDevice(
device, const_cast<struct pcm_config*>(&mConfig.value()), mBufferSizeFrames);
}
if (proxy.get() == nullptr) {
return ::android::NO_INIT;
}
alsaDeviceProxies.push_back(std::move(proxy));
}
mAlsaDeviceProxies = std::move(alsaDeviceProxies);
return ::android::OK;
}
对于扬声器,麦克风之类的都是内置的,因此就看下openProxyForAttachedDevice:
DeviceProxy openProxyForAttachedDevice(const DeviceProfile& deviceProfile,
struct pcm_config* pcmConfig, size_t bufferFrameCount) {
if (deviceProfile.isExternal) {
LOG(FATAL) << __func__ << ": called for an external device, address=" << deviceProfile;
}
DeviceProxy proxy(deviceProfile);
if (!profile_fill_builtin_device_info(proxy.getProfile(), pcmConfig, bufferFrameCount)) {
LOG(FATAL) << __func__ << ": failed to init for built-in device, address=" << deviceProfile;
}
if (int err = proxy_prepare_from_default_config(proxy.get(), proxy.getProfile()); err != 0) {
LOG(FATAL) << __func__ << ": fail to prepare for device address=" << deviceProfile
<< " error=" << err;
return DeviceProxy();
}
if (int err = proxy_open(proxy.get()); err != 0) {
LOG(ERROR) << __func__ << ": failed to open device, address=" << deviceProfile
<< " error=" << err;
return DeviceProxy();
}
return proxy;
}
这儿我们看下proxy_open 就好了,前面这个都是设置配置:
int proxy_open(alsa_device_proxy * proxy)
{
const alsa_device_profile* profile = proxy->profile;
ALOGD("proxy_open(card:%d device:%d %s)", profile->card, profile->device,
profile->direction == PCM_OUT ? "PCM_OUT" : "PCM_IN");
if (profile->card < 0 || profile->device < 0) {
return -EINVAL;
}
proxy->pcm = pcm_open(profile->card, profile->device,
profile->direction | ALSA_CLOCK_TYPE, &proxy->alsa_config);
if (proxy->pcm == NULL) {
return -ENOMEM;
}
if (!pcm_is_ready(proxy->pcm)) {
ALOGE(" proxy_open() pcm_is_ready() failed: %s", pcm_get_error(proxy->pcm));
#if defined(LOG_PCM_PARAMS)
log_pcm_config(&proxy->alsa_config, "config");
#endif
pcm_close(proxy->pcm);
proxy->pcm = NULL;
return -ENOMEM;
}
return 0;
}
这儿关键的调用就是pcm_open:
struct pcm *pcm_open(unsigned int card, unsigned int device,
unsigned int flags, struct pcm_config *config)
{
struct pcm *pcm;
struct snd_pcm_info info;
struct snd_pcm_hw_params params;
struct snd_pcm_sw_params sparams;
int rc, pcm_type;
if (!config) {
return &bad_pcm; /* TODO: could support default config here */
}
pcm = calloc(1, sizeof(struct pcm));
if (!pcm) {
oops(&bad_pcm, ENOMEM, "can't allocate PCM object");
return &bad_pcm; /* TODO: could support default config here */
}
pcm->config = *config;
pcm->flags = flags;
pcm->snd_node = snd_utils_get_dev_node(card, device, NODE_PCM);
pcm_type = snd_utils_get_node_type(pcm->snd_node);
if (pcm_type == SND_NODE_TYPE_PLUGIN)
pcm->ops = &plug_ops;
else
pcm->ops = &hw_ops;
pcm->fd = pcm->ops->open(card, device, flags, &pcm->data, pcm->snd_node);
if (pcm->fd < 0) {
oops(&bad_pcm, errno, "cannot open device %u for card %u",
device, card);
goto fail_open;
}
if (pcm->ops->ioctl(pcm->data, SNDRV_PCM_IOCTL_INFO, &info)) {
oops(&bad_pcm, errno, "cannot get info");
goto fail_close;
}
pcm->subdevice = info.subdevice;
param_init(¶ms);
param_set_mask(¶ms, SNDRV_PCM_HW_PARAM_FORMAT,
pcm_format_to_alsa(config->format));
param_set_mask(¶ms, SNDRV_PCM_HW_PARAM_SUBFORMAT,
SNDRV_PCM_SUBFORMAT_STD);
param_set_min(¶ms, SNDRV_PCM_HW_PARAM_PERIOD_SIZE, config->period_size);
param_set_int(¶ms, SNDRV_PCM_HW_PARAM_SAMPLE_BITS,
pcm_format_to_bits(config->format));
param_set_int(¶ms, SNDRV_PCM_HW_PARAM_FRAME_BITS,
pcm_format_to_bits(config->format) * config->channels);
param_set_int(¶ms, SNDRV_PCM_HW_PARAM_CHANNELS,
config->channels);
param_set_int(¶ms, SNDRV_PCM_HW_PARAM_PERIODS, config->period_count);
param_set_int(¶ms, SNDRV_PCM_HW_PARAM_RATE, config->rate);
if (flags & PCM_NOIRQ) {
if (!(flags & PCM_MMAP)) {
oops(&bad_pcm, EINVAL, "noirq only currently supported with mmap().");
goto fail_close;
}
params.flags |= SNDRV_PCM_HW_PARAMS_NO_PERIOD_WAKEUP;
pcm->noirq_frames_per_msec = config->rate / 1000;
}
if (flags & PCM_MMAP)
param_set_mask(¶ms, SNDRV_PCM_HW_PARAM_ACCESS,
SNDRV_PCM_ACCESS_MMAP_INTERLEAVED);
else
param_set_mask(¶ms, SNDRV_PCM_HW_PARAM_ACCESS,
SNDRV_PCM_ACCESS_RW_INTERLEAVED);
if (pcm->ops->ioctl(pcm->data, SNDRV_PCM_IOCTL_HW_PARAMS, ¶ms)) {
oops(&bad_pcm, errno, "cannot set hw params");
goto fail_close;
}
/* get our refined hw_params */
config->period_size = param_get_int(¶ms, SNDRV_PCM_HW_PARAM_PERIOD_SIZE);
config->period_count = param_get_int(¶ms, SNDRV_PCM_HW_PARAM_PERIODS);
pcm->buffer_size = config->period_count * config->period_size;
if (flags & PCM_MMAP) {
pcm->mmap_buffer = pcm->ops->mmap(pcm->data, NULL,
pcm_frames_to_bytes(pcm, pcm->buffer_size),
PROT_READ | PROT_WRITE, MAP_FILE | MAP_SHARED, 0);
if (pcm->mmap_buffer == MAP_FAILED) {
oops(&bad_pcm, errno, "failed to mmap buffer %d bytes\n",
pcm_frames_to_bytes(pcm, pcm->buffer_size));
goto fail_close;
}
}
memset(&sparams, 0, sizeof(sparams));
sparams.tstamp_mode = SNDRV_PCM_TSTAMP_ENABLE;
sparams.period_step = 1;
if (!config->start_threshold) {
if (pcm->flags & PCM_IN)
pcm->config.start_threshold = sparams.start_threshold = 1;
else
pcm->config.start_threshold = sparams.start_threshold =
config->period_count * config->period_size / 2;
} else
sparams.start_threshold = config->start_threshold;
/* pick a high stop threshold - todo: does this need further tuning */
if (!config->stop_threshold) {
if (pcm->flags & PCM_IN)
pcm->config.stop_threshold = sparams.stop_threshold =
config->period_count * config->period_size * 10;
else
pcm->config.stop_threshold = sparams.stop_threshold =
config->period_count * config->period_size;
}
else
sparams.stop_threshold = config->stop_threshold;
if (!pcm->config.avail_min) {
if (pcm->flags & PCM_MMAP)
pcm->config.avail_min = sparams.avail_min = pcm->config.period_size;
else
pcm->config.avail_min = sparams.avail_min = 1;
} else
sparams.avail_min = config->avail_min;
sparams.xfer_align = config->period_size / 2; /* needed for old kernels */
sparams.silence_threshold = config->silence_threshold;
sparams.silence_size = config->silence_size;
if (pcm->ops->ioctl(pcm->data, SNDRV_PCM_IOCTL_SW_PARAMS, &sparams)) {
oops(&bad_pcm, errno, "cannot set sw params");
goto fail;
}
pcm->boundary = sparams.boundary;
rc = pcm_hw_mmap_status(pcm);
if (rc < 0) {
oops(&bad_pcm, errno, "mmap status failed");
goto fail;
}
#ifdef SNDRV_PCM_IOCTL_TTSTAMP
if (pcm->flags & PCM_MONOTONIC) {
int arg = SNDRV_PCM_TSTAMP_TYPE_MONOTONIC;
rc = pcm->ops->ioctl(pcm->data, SNDRV_PCM_IOCTL_TTSTAMP, &arg);
if (rc < 0) {
oops(&bad_pcm, errno, "cannot set timestamp type");
goto fail;
}
}
#endif
pcm->xruns = 0;
return pcm;
fail:
if (flags & PCM_MMAP)
pcm->ops->munmap(pcm->data, pcm->mmap_buffer, pcm_frames_to_bytes(pcm, pcm->buffer_size));
fail_close:
pcm->ops->close(pcm->data);
fail_open:
snd_utils_put_dev_node(pcm->snd_node);
free(pcm);
return &bad_pcm;
}
启动的操作都是通过ops传递的,那ops是什么呢?看下定义:
struct pcm_ops hw_ops = {
.open = pcm_hw_open,
.close = pcm_hw_close,
.ioctl = pcm_hw_ioctl,
.mmap = pcm_hw_mmap,
.munmap = pcm_hw_munmap,
.poll = pcm_hw_poll,
};
那open就是pcm_hw_open:
static int pcm_hw_open(unsigned int card, unsigned int device,
unsigned int flags, void **data,
__attribute__((unused)) void *node)
{
struct pcm_hw_data *hw_data;
char fn[256];
int fd;
hw_data = calloc(1, sizeof(*hw_data));
if (!hw_data) {
return -ENOMEM;
}
snprintf(fn, sizeof(fn), "/dev/snd/pcmC%uD%u%c", card, device,
flags & PCM_IN ? 'c' : 'p');
fd = open(fn, O_RDWR|O_NONBLOCK);
if (fd < 0) {
printf("%s: cannot open device '%s'", __func__, fn);
return fd;
}
if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) < 0) {
printf("%s: failed to reset blocking mode '%s'",
__func__, fn);
goto err_close;
}
hw_data->snd_node = node;
hw_data->card = card;
hw_data->device = device;
hw_data->fd = fd;
*data = hw_data;
return fd;
err_close:
close(fd);
free(hw_data);
return -ENODEV;
}
这个操作就打开内核节点。通过ioctl也可以进行其他信息获取。
这样我们就看到了整个start的流程了。再回到AAudioServiceStreamBase中,我们之前卖了一个关子,看的是独占模式的startDevice, 那共享模式的startDevice又是什么呢?接下来看下:
aaudio_result_t AAudioServiceEndpointShared::startStream(
sp<AAudioServiceStreamBase> sharedStream,
audio_port_handle_t *clientHandle)
NO_THREAD_SAFETY_ANALYSIS {
aaudio_result_t result = AAUDIO_OK;
{
std::lock_guard<std::mutex> lock(mLockStreams);
if (++mRunningStreamCount == 1) { // atomic
result = getStreamInternal()->systemStart();
if (result != AAUDIO_OK) {
--mRunningStreamCount;
} else {
result = startSharingThread_l();
if (result != AAUDIO_OK) {
getStreamInternal()->systemStopFromApp();
--mRunningStreamCount;
}
}
}
}
if (result == AAUDIO_OK) {
const audio_attributes_t attr = getAudioAttributesFrom(sharedStream.get());
result = getStreamInternal()->startClient(
sharedStream->getAudioClient(), &attr, clientHandle);
if (result != AAUDIO_OK) {
if (--mRunningStreamCount == 0) { // atomic
stopSharingThread();
getStreamInternal()->systemStopFromApp();
}
}
}
return result;
}
这儿的getStreamInternal获取的是mStreamInternal,会被多个client共享,这儿的逻辑也可以看出来,当首次启动的时候,会执行启动,之后就只添加client了。这个逻辑对于理解共享模式和独占模式比较关键。
接下来先回顾下mStreamInternal的来源,以采集为例,播放也类似:
AAudioServiceEndpointCapture::AAudioServiceEndpointCapture(AAudioService& audioService)
: AAudioServiceEndpointShared(
new AudioStreamInternalCapture(audioService.asAAudioServiceInterface(), true)) {
}
其实就是AudioStreamInternalCapture,看到这个是否有熟悉的感觉?没错,就是client端创建mmap时候的采集对象,区别就是之前创建的入参是:
stream = new AudioStreamInternalCapture(AAudioBinderClient::getInstance(),
false);
后面这个bool就表示是否在audioservice内部,client端创建的时候必然不在,server端创建必然就在了。
看到这儿是不是还是有点懵?那我们再回顾下共享模式的open:
// Share an AudioStreamInternal.
aaudio_result_t AAudioServiceEndpointShared::open(const aaudio::AAudioStreamRequest &request) {
aaudio_result_t result = AAUDIO_OK;
const AAudioStreamConfiguration &configuration = request.getConstantConfiguration();
copyFrom(configuration);
mRequestedDeviceId = configuration.getDeviceId();
AudioStreamBuilder builder;
builder.copyFrom(configuration);
builder.setSharingMode(AAUDIO_SHARING_MODE_EXCLUSIVE);
// Don't fall back to SHARED because that would cause recursion.
builder.setSharingModeMatchRequired(true);
builder.setBufferCapacity(DEFAULT_BUFFER_CAPACITY);
result = mStreamInternal->open(builder);
setSampleRate(mStreamInternal->getSampleRate());
setChannelMask(mStreamInternal->getChannelMask());
setDeviceId(mStreamInternal->getDeviceId());
setSessionId(mStreamInternal->getSessionId());
setFormat(AUDIO_FORMAT_PCM_FLOAT); // force for mixer
setHardwareSampleRate(mStreamInternal->getHardwareSampleRate());
setHardwareFormat(mStreamInternal->getHardwareFormat());
setHardwareSamplesPerFrame(mStreamInternal->getHardwareSamplesPerFrame());
mFramesPerBurst = mStreamInternal->getFramesPerBurst();
return result;
}
是不是有种茅塞顿开的感觉,共享模式原来也是独占模式!!!,当时区别在于该独占模式的流client感知不到,而是让server端持有,这样就可以方便server端做一些暗箱操作,这儿是不是很妙?
看到了这儿,systemStart 就自然也就清楚了,就是把我们独占模式的流程再走一遍。再看下startSharingThread_l:
aaudio_result_t aaudio::AAudioServiceEndpointShared::startSharingThread_l() {
// Launch the callback loop thread.
int64_t periodNanos = getStreamInternal()->getFramesPerBurst()
* AAUDIO_NANOS_PER_SECOND
/ getSampleRate();
mCallbackEnabled.store(true);
// Prevent this object from getting deleted before the thread has a chance to create
// its strong pointer. Assume the thread will call decStrong().
this->incStrong(nullptr);
aaudio_result_t result = getStreamInternal()->createThread(periodNanos,
aaudio_endpoint_thread_proc,
this);
if (result != AAUDIO_OK) {
this->decStrong(nullptr); // Because the thread won't do it.
}
return result;
}
这个操作就是启动流内部的线程:
aaudio_result_t createThread(int64_t periodNanoseconds,
aaudio_audio_thread_proc_t threadProc,
void *threadArg)
EXCLUDES(mStreamLock) {
std::lock_guard<std::mutex> lock(mStreamLock);
return createThread_l(periodNanoseconds, threadProc, threadArg);
}
接下来继续看下创建细节:
// This is not exposed in the API.
// But it is still used internally to implement callbacks for MMAP mode.
aaudio_result_t AudioStream::createThread_l(int64_t periodNanoseconds,
aaudio_audio_thread_proc_t threadProc,
void* threadArg)
{
if (mHasThread) {
ALOGD("%s() - previous thread was not joined, join now to be safe", __func__);
joinThread_l(nullptr);
}
if (threadProc == nullptr) {
return AAUDIO_ERROR_NULL;
}
// Pass input parameters to the background thread.
mThreadProc = threadProc;
mThreadArg = threadArg;
setPeriodNanoseconds(periodNanoseconds);
mHasThread = true;
// Prevent this object from getting deleted before the thread has a chance to create
// its strong pointer. Assume the thread will call decStrong().
this->incStrong(nullptr);
int err = pthread_create(&mThread, nullptr, AudioStream_internalThreadProc, this);
if (err != 0) {
android::status_t status = -errno;
ALOGE("%s() - pthread_create() failed, %d", __func__, status);
this->decStrong(nullptr); // Because the thread won't do it.
mHasThread = false;
return AAudioConvert_androidToAAudioResult(status);
} else {
// TODO Use AAudioThread or maybe AndroidThread
// Name the thread with an increasing index, "AAudio_#", for debugging.
static std::atomic<uint32_t> nextThreadIndex{1};
char name[16]; // max length for a pthread_name
uint32_t index = nextThreadIndex++;
// Wrap the index so that we do not hit the 16 char limit
// and to avoid hard-to-read large numbers.
index = index % 100000; // arbitrary
snprintf(name, sizeof(name), "AAudio_%u", index);
err = pthread_setname_np(mThread, name);
ALOGW_IF((err != 0), "Could not set name of AAudio thread. err = %d", err);
return AAUDIO_OK;
}
}
到了这儿就知道了MMap时候各种AAudio_数字的线程是怎么来的了吧?
继续看下AudioStream_internalThreadProc:
// This registers the callback thread with the server before
// passing control to the app. This gives the server an opportunity to boost
// the thread's performance characteristics.
void* AudioStream::wrapUserThread() {
void* procResult = nullptr;
mThreadRegistrationResult = registerThread();
if (mThreadRegistrationResult == AAUDIO_OK) {
// Run callback loop. This may take a very long time.
procResult = mThreadProc(mThreadArg);
mThreadRegistrationResult = unregisterThread();
}
return procResult;
}
// This is the entry point for the new thread created by createThread_l().
// It converts the 'C' function call to a C++ method call.
static void* AudioStream_internalThreadProc(void* threadArg) {
AudioStream *audioStream = (AudioStream *) threadArg;
// Prevent the stream from being deleted while being used.
// This is just for extra safety. It is probably not needed because
// this callback should be joined before the stream is closed.
android::sp<AudioStream> protectedStream(audioStream);
// Balance the incStrong() in createThread_l().
protectedStream->decStrong(nullptr);
return protectedStream->wrapUserThread();
}
这儿需要关注的就是registerThread和mThreadProc,前者注册线程,后者执行loop,先看下注册线程,我们直接看重点:
aaudio_result_t AAudioServiceStreamBase::registerAudioThread_l(
pid_t ownerPid, pid_t clientThreadId, int priority) {
aaudio_result_t result = AAUDIO_OK;
if (getRegisteredThread() != AAudioServiceStreamBase::ILLEGAL_THREAD_ID) {
ALOGE("AAudioService::registerAudioThread(), thread already registered");
result = AAUDIO_ERROR_INVALID_STATE;
} else {
setRegisteredThread(clientThreadId);
int err = android::requestPriority(ownerPid, clientThreadId,
priority, true /* isForApp */);
if (err != 0) {
ALOGE("AAudioService::registerAudioThread(%d) failed, errno = %d, priority = %d",
clientThreadId, errno, priority);
result = AAUDIO_ERROR_INTERNAL;
}
}
return result;
}
这儿关键的是android::requestPriority, audioservice 帮忙client 申请线程权限。看到这儿发现Android的实现也有点“细节控”,考虑的很全,或许也正是这样的细节控,android 体验越来越好了,看到这儿,在下要为Android点个赞。
接下来看mThreadProc:
// Glue between C and C++ callbacks.
static void *aaudio_endpoint_thread_proc(void *arg) {
assert(arg != nullptr);
ALOGD("%s() called", __func__);
// Prevent the stream from being deleted while being used.
// This is just for extra safety. It is probably not needed because
// this callback should be joined before the stream is closed.
auto endpointPtr = static_cast<AAudioServiceEndpointShared *>(arg);
android::sp<AAudioServiceEndpointShared> endpoint(endpointPtr);
// Balance the incStrong() in startSharingThread_l().
endpoint->decStrong(nullptr);
void *result = endpoint->callbackLoop();
// Close now so that the HW resource is freed and we can open a new device.
if (!endpoint->isConnected()) {
ALOGD("%s() call safeReleaseCloseFromCallback()", __func__);
// Release and close under a lock with no check for callback collisions.
endpoint->getStreamInternal()->safeReleaseCloseInternal();
}
return result;
}
这儿就是执行一个loop,可以看下采集的:
// Read data from the shared MMAP stream and then distribute it to the client streams.
void *AAudioServiceEndpointCapture::callbackLoop() {
ALOGD("callbackLoop() entering");
aaudio_result_t result = AAUDIO_OK;
int64_t timeoutNanos = getStreamInternal()->calculateReasonableTimeout();
// result might be a frame count
while (mCallbackEnabled.load() && getStreamInternal()->isActive() && (result >= 0)) {
int64_t mmapFramesRead = getStreamInternal()->getFramesRead();
// Read audio data from stream using a blocking read.
result = getStreamInternal()->read(mDistributionBuffer.get(),
getFramesPerBurst(), timeoutNanos);
if (result == AAUDIO_ERROR_DISCONNECTED) {
ALOGD("%s() read() returned AAUDIO_ERROR_DISCONNECTED", __func__);
AAudioServiceEndpointShared::handleDisconnectRegisteredStreamsAsync();
break;
} else if (result != getFramesPerBurst()) {
ALOGW("callbackLoop() read %d / %d",
result, getFramesPerBurst());
break;
}
// Distribute data to each active stream.
{ // brackets are for lock_guard
std::lock_guard <std::mutex> lock(mLockStreams);
for (const auto& clientStream : mRegisteredStreams) {
if (clientStream->isRunning() && !clientStream->isSuspended()) {
sp<AAudioServiceStreamShared> streamShared =
static_cast<AAudioServiceStreamShared *>(clientStream.get());
streamShared->writeDataIfRoom(mmapFramesRead,
mDistributionBuffer.get(),
getFramesPerBurst());
}
}
}
}
ALOGD("callbackLoop() exiting");
return nullptr; // TODO review
}
到了这儿感觉就像工作日开车到了高速上,前后都没人,越开越丝滑,这儿的逻辑也是越看越豁然开朗。
这儿就是先利用mStreamInternal 主动读数据,然后再统一发放给共享该mmap的client,也就是mRegisteredStreams。
再看下播放的,也可以猜到流程了,就是从各个应用读数据,然后mix,写给mmap通道:
// Mix data from each application stream and write result to the shared MMAP stream.
void *AAudioServiceEndpointPlay::callbackLoop() {
ALOGD("%s() entering >>>>>>>>>>>>>>> MIXER", __func__);
aaudio_result_t result = AAUDIO_OK;
int64_t timeoutNanos = getStreamInternal()->calculateReasonableTimeout();
// result might be a frame count
while (mCallbackEnabled.load() && getStreamInternal()->isActive() && (result >= 0)) {
// Mix data from each active stream.
mMixer.clear();
{ // brackets are for lock_guard
int index = 0;
int64_t mmapFramesWritten = getStreamInternal()->getFramesWritten();
std::lock_guard <std::mutex> lock(mLockStreams);
for (const auto& clientStream : mRegisteredStreams) {
int64_t clientFramesRead = 0;
bool allowUnderflow = true;
if (clientStream->isSuspended()) {
continue; // dead stream
}
aaudio_stream_state_t state = clientStream->getState();
if (state == AAUDIO_STREAM_STATE_STOPPING) {
allowUnderflow = false; // just read what is already in the FIFO
} else if (state != AAUDIO_STREAM_STATE_STARTED) {
continue; // this stream is not running so skip it.
}
sp<AAudioServiceStreamShared> streamShared =
static_cast<AAudioServiceStreamShared *>(clientStream.get());
{
// Lock the AudioFifo to protect against close.
std::lock_guard <std::mutex> lock(streamShared->audioDataQueueLock);
std::shared_ptr<SharedRingBuffer> audioDataQueue
= streamShared->getAudioDataQueue_l();
std::shared_ptr<FifoBuffer> fifo;
if (audioDataQueue && (fifo = audioDataQueue->getFifoBuffer())) {
// Determine offset between framePosition in client's stream
// vs the underlying MMAP stream.
clientFramesRead = fifo->getReadCounter();
// These two indices refer to the same frame.
int64_t positionOffset = mmapFramesWritten - clientFramesRead;
streamShared->setTimestampPositionOffset(positionOffset);
int32_t framesMixed = mMixer.mix(index, fifo, allowUnderflow);
if (streamShared->isFlowing()) {
// Consider it an underflow if we got less than a burst
// after the data started flowing.
bool underflowed = allowUnderflow
&& framesMixed < mMixer.getFramesPerBurst();
if (underflowed) {
streamShared->incrementXRunCount();
}
} else if (framesMixed > 0) {
// Mark beginning of data flow after a start.
streamShared->setFlowing(true);
}
clientFramesRead = fifo->getReadCounter();
}
}
if (clientFramesRead > 0) {
// This timestamp represents the completion of data being read out of the
// client buffer. It is sent to the client and used in the timing model
// to decide when the client has room to write more data.
Timestamp timestamp(clientFramesRead, AudioClock::getNanoseconds());
streamShared->markTransferTime(timestamp);
}
index++; // just used for labelling tracks in systrace
}
}
// Write mixer output to stream using a blocking write.
result = getStreamInternal()->write(mMixer.getOutputBuffer(),
getFramesPerBurst(), timeoutNanos);
if (result == AAUDIO_ERROR_DISCONNECTED) {
ALOGD("%s() write() returned AAUDIO_ERROR_DISCONNECTED", __func__);
AAudioServiceEndpointShared::handleDisconnectRegisteredStreamsAsync();
break;
} else if (result != getFramesPerBurst()) {
ALOGW("callbackLoop() wrote %d / %d",
result, getFramesPerBurst());
break;
}
}
ALOGD("%s() exiting, enabled = %d, state = %d, result = %d <<<<<<<<<<<<< MIXER",
__func__, mCallbackEnabled.load(), getStreamInternal()->getState(), result);
return nullptr; // TODO review
}
就是这个逻辑。
接下来再看下startClient,可以猜到最终就是把client的stream 注册到audioflinger中,这样也是dumpsys 就可以看到了:
aaudio_result_t AudioStreamInternal::startClient(const android::AudioClient& client,
const audio_attributes_t *attr,
audio_port_handle_t *portHandle) {
ALOGV("%s() called", __func__);
if (getServiceHandle() == AAUDIO_HANDLE_INVALID) {
return AAUDIO_ERROR_INVALID_STATE;
}
aaudio_result_t result = mServiceInterface.startClient(mServiceStreamHandleInfo,
client, attr, portHandle);
ALOGV("%s(%d) returning %d", __func__, *portHandle, result);
return result;
}
果然调用又回到了audioservice中了:
aaudio_result_t AAudioService::startClient(aaudio_handle_t streamHandle,
const android::AudioClient& client,
const audio_attributes_t *attr,
audio_port_handle_t *clientHandle) {
const sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
if (serviceStream.get() == nullptr) {
ALOGW("%s(), invalid streamHandle = 0x%0x", __func__, streamHandle);
return AAUDIO_ERROR_INVALID_HANDLE;
}
return serviceStream->startClient(client, attr, clientHandle);
}
结下来会到mmap中,因为这儿的streaminternal 就是mmap stream:
aaudio_result_t AAudioServiceStreamMMAP::startClient(const android::AudioClient& client,
const audio_attributes_t *attr,
audio_port_handle_t *clientHandle) {
sp<AAudioServiceEndpoint> endpoint = mServiceEndpointWeak.promote();
if (endpoint == nullptr) {
ALOGE("%s() has no endpoint", __func__);
return AAUDIO_ERROR_INVALID_STATE;
}
// Start the client on behalf of the application. Generate a new porthandle.
aaudio_result_t result = endpoint->startClient(client, attr, clientHandle);
return result;
}
接下来再看:
aaudio_result_t AAudioServiceEndpointMMAP::startClient(const android::AudioClient& client,
const audio_attributes_t *attr,
audio_port_handle_t *clientHandle) {
return mMmapStream == nullptr
? AAUDIO_ERROR_NULL
: AAudioConvert_androidToAAudioResult(mMmapStream->start(client, attr, clientHandle));
}
这样就和之前看的mmap start 流程一样了,走到MmapThread里,一顿注册之类的。
那AAudioServiceEndpointShared 中的client是啥时候注册的呢?是open的时候:
aaudio_result_t AAudioServiceStreamShared::open(const aaudio::AAudioStreamRequest &request) {
sp<AAudioServiceStreamShared> keep(this);
if (request.getConstantConfiguration().getSharingMode() != AAUDIO_SHARING_MODE_SHARED) {
ALOGE("%s() sharingMode mismatch %d", __func__,
request.getConstantConfiguration().getSharingMode());
return AAUDIO_ERROR_INTERNAL;
}
aaudio_result_t result = AAudioServiceStreamBase::open(request);
if (result != AAUDIO_OK) {
return result;
}
const AAudioStreamConfiguration &configurationInput = request.getConstantConfiguration();
sp<AAudioServiceEndpoint> endpoint = mServiceEndpointWeak.promote();
if (endpoint == nullptr) {
result = AAUDIO_ERROR_INVALID_STATE;
goto error;
}
// Is the request compatible with the shared endpoint?
setFormat(configurationInput.getFormat());
if (getFormat() == AUDIO_FORMAT_DEFAULT) {
setFormat(AUDIO_FORMAT_PCM_FLOAT);
} else if (getFormat() != AUDIO_FORMAT_PCM_FLOAT) {
ALOGD("%s() audio_format_t mAudioFormat = %d, need FLOAT", __func__, getFormat());
result = AAUDIO_ERROR_INVALID_FORMAT;
goto error;
}
setSampleRate(configurationInput.getSampleRate());
if (getSampleRate() == AAUDIO_UNSPECIFIED) {
setSampleRate(endpoint->getSampleRate());
} else if (getSampleRate() != endpoint->getSampleRate()) {
ALOGD("%s() mSampleRate = %d, need %d",
__func__, getSampleRate(), endpoint->getSampleRate());
result = AAUDIO_ERROR_INVALID_RATE;
goto error;
}
setChannelMask(configurationInput.getChannelMask());
if (getChannelMask() == AAUDIO_UNSPECIFIED) {
setChannelMask(endpoint->getChannelMask());
} else if (getSamplesPerFrame() != endpoint->getSamplesPerFrame()) {
ALOGD("%s() mSamplesPerFrame = %#x, need %#x",
__func__, getSamplesPerFrame(), endpoint->getSamplesPerFrame());
result = AAUDIO_ERROR_OUT_OF_RANGE;
goto error;
}
setBufferCapacity(calculateBufferCapacity(configurationInput.getBufferCapacity(),
mFramesPerBurst));
if (getBufferCapacity() < 0) {
result = getBufferCapacity(); // negative error code
setBufferCapacity(0);
goto error;
}
{
std::lock_guard<std::mutex> lock(audioDataQueueLock);
// Create audio data shared memory buffer for client.
mAudioDataQueue = std::make_shared<SharedRingBuffer>();
result = mAudioDataQueue->allocate(calculateBytesPerFrame(), getBufferCapacity());
if (result != AAUDIO_OK) {
ALOGE("%s() could not allocate FIFO with %d frames",
__func__, getBufferCapacity());
result = AAUDIO_ERROR_NO_MEMORY;
goto error;
}
}
result = endpoint->registerStream(keep);
if (result != AAUDIO_OK) {
goto error;
}
setState(AAUDIO_STREAM_STATE_OPEN);
return AAUDIO_OK;
error:
close();
return result;
}
看到 endpoint->registerStream(keep)
调用了吧,这里面就是注册:
aaudio_result_t AAudioServiceEndpoint::registerStream(const sp<AAudioServiceStreamBase>& stream) {
const std::lock_guard<std::mutex> lock(mLockStreams);
mRegisteredStreams.push_back(stream);
return AAUDIO_OK;
}
接下来再回到client的requestStart_l中,还有剩余逻辑需要看看:
startTime = AudioClock::getNanoseconds();
mClockModel.start(startTime);
mNeedCatchUp.request(); // Ask data processing code to catch up when first timestamp received.
// Start data callback thread.
if (result == AAUDIO_OK && isDataCallbackSet()) {
// Launch the callback loop thread.
int64_t periodNanos = mCallbackFrames
* AAUDIO_NANOS_PER_SECOND
/ getSampleRate();
mCallbackEnabled.store(true);
result = createThread_l(periodNanos, aaudio_callback_thread_proc, this);
}
if (result != AAUDIO_OK) {
setState(originalState);
}
先启动等时时钟mClockModel, 由于mmap是NOIRQ的,因此就需要时钟来估计进度。mNeedCatchUp是设置一个标记,让其他线程来应答。 接下来也是创建线程,这个流程和我们前面介绍的一样。
看到了这儿,我们来个小总结吧,主要是mmap独占模式和共享模式的区别:
- 共享模式本质上也是独占模式通道
- 独占模式的独占流是在client这边,允许client直接与hal数据读写
- 共享模式的独占流是在server这边,允许server和hal数据读写,并由server分发给client
- 独占模式是client直接与hal共享内存
- 共享模式是server和hal共享内存,server和client共享内存
也就是共享模式本质上就是多个client共享一个独占模式通道。