John Grossman 720aa28279 Switch the way we configure for MediaPlayer retransmission.
This is a cherry-pick of I6ab07d89b2eeb0650e634b8c3b7a0b36aba4e7dd
with merge conflicts addressed by hand and additional changes made in
response to code review feedback.

Move in the direction of a more publishable API for configuring a
media player for retransmission.  It used to be that we used a custom
invoke and a modified URL (prefixed with aahTX://).  There are many
issues with this technique and it was never meant to stand the test of
time.

This CL gets rid of all that.  A new (but currently hidden) method was
introduced to the java level MediaPlayer API, called
setRetransmitTarget(InetSocketAddress), which allows an app writer to
set the retransmit target.  For now, this method needs to be called
before a call to setDataSource (which is pretty unusual for the
MediaPlayer API) because this mid level code uses this as a cue to
instantiate an aahTX player instead of relying on the data source to
select a player.  When retranmit functionality becomes part of the
existing android player implemenation, this
set-retrans-before-set-data-source behavior can go away, along with
the aahTX player itself.

Change-Id: I3b46c5227bbf69acb2f3cc4f93cfccad9777be98
Signed-off-by: John Grossman <johngro@google.com>
2012-03-01 14:41:35 -08:00

1178 lines
34 KiB
C++

/*
* Copyright (C) 2011 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define LOG_TAG "LibAAH_RTP"
#include <utils/Log.h>
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include <netdb.h>
#include <netinet/ip.h>
#include <common_time/cc_helper.h>
#include <media/IMediaPlayer.h>
#include <media/stagefright/foundation/ADebug.h>
#include <media/stagefright/foundation/AMessage.h>
#include <media/stagefright/FileSource.h>
#include <media/stagefright/MediaBuffer.h>
#include <media/stagefright/MediaDefs.h>
#include <media/stagefright/MetaData.h>
#include <utils/Timers.h>
#include "aah_tx_packet.h"
#include "aah_tx_player.h"
namespace android {
static int64_t kLowWaterMarkUs = 2000000ll; // 2secs
static int64_t kHighWaterMarkUs = 10000000ll; // 10secs
static const size_t kLowWaterMarkBytes = 40000;
static const size_t kHighWaterMarkBytes = 200000;
// When we start up, how much lead time should we put on the first access unit?
static const int64_t kAAHStartupLeadTimeUs = 300000LL;
// How much time do we attempt to lead the clock by in steady state?
static const int64_t kAAHBufferTimeUs = 1000000LL;
// how long do we keep data in our retransmit buffer after sending it.
const int64_t AAH_TXPlayer::kAAHRetryKeepAroundTimeNs =
kAAHBufferTimeUs * 1100;
sp<MediaPlayerBase> createAAH_TXPlayer() {
sp<MediaPlayerBase> ret = new AAH_TXPlayer();
return ret;
}
template <typename T> static T clamp(T val, T min, T max) {
if (val < min) {
return min;
} else if (val > max) {
return max;
} else {
return val;
}
}
struct AAH_TXEvent : public TimedEventQueue::Event {
AAH_TXEvent(AAH_TXPlayer *player,
void (AAH_TXPlayer::*method)()) : mPlayer(player)
, mMethod(method) {}
protected:
virtual ~AAH_TXEvent() {}
virtual void fire(TimedEventQueue *queue, int64_t /* now_us */) {
(mPlayer->*mMethod)();
}
private:
AAH_TXPlayer *mPlayer;
void (AAH_TXPlayer::*mMethod)();
AAH_TXEvent(const AAH_TXEvent &);
AAH_TXEvent& operator=(const AAH_TXEvent &);
};
AAH_TXPlayer::AAH_TXPlayer()
: mQueueStarted(false)
, mFlags(0)
, mExtractorFlags(0) {
DataSource::RegisterDefaultSniffers();
mBufferingEvent = new AAH_TXEvent(this, &AAH_TXPlayer::onBufferingUpdate);
mBufferingEventPending = false;
mPumpAudioEvent = new AAH_TXEvent(this, &AAH_TXPlayer::onPumpAudio);
mPumpAudioEventPending = false;
mAudioCodecData = NULL;
reset_l();
}
AAH_TXPlayer::~AAH_TXPlayer() {
if (mQueueStarted) {
mQueue.stop();
}
reset_l();
}
void AAH_TXPlayer::cancelPlayerEvents(bool keepBufferingGoing) {
if (!keepBufferingGoing) {
mQueue.cancelEvent(mBufferingEvent->eventID());
mBufferingEventPending = false;
mQueue.cancelEvent(mPumpAudioEvent->eventID());
mPumpAudioEventPending = false;
}
}
status_t AAH_TXPlayer::initCheck() {
// Check for the presense of the common time service by attempting to query
// for CommonTime's frequency. If we get an error back, we cannot talk to
// the service at all and should abort now.
status_t res;
uint64_t freq;
res = mCCHelper.getCommonFreq(&freq);
if (OK != res) {
ALOGE("Failed to connect to common time service! (res %d)", res);
return res;
}
return OK;
}
status_t AAH_TXPlayer::setDataSource(
const char *url,
const KeyedVector<String8, String8> *headers) {
Mutex::Autolock autoLock(mLock);
return setDataSource_l(url, headers);
}
status_t AAH_TXPlayer::setDataSource_l(
const char *url,
const KeyedVector<String8, String8> *headers) {
reset_l();
mUri.setTo(url);
if (headers) {
mUriHeaders = *headers;
ssize_t index = mUriHeaders.indexOfKey(String8("x-hide-urls-from-log"));
if (index >= 0) {
// Browser is in "incognito" mode, suppress logging URLs.
// This isn't something that should be passed to the server.
mUriHeaders.removeItemsAt(index);
mFlags |= INCOGNITO;
}
}
// The URL may optionally contain a "#" character followed by a Skyjam
// cookie. Ideally the cookie header should just be passed in the headers
// argument, but the Java API for supplying headers is apparently not yet
// exposed in the SDK used by application developers.
const char kSkyjamCookieDelimiter = '#';
char* skyjamCookie = strrchr(mUri.string(), kSkyjamCookieDelimiter);
if (skyjamCookie) {
skyjamCookie++;
mUriHeaders.add(String8("Cookie"), String8(skyjamCookie));
mUri = String8(mUri.string(), skyjamCookie - mUri.string());
}
return OK;
}
status_t AAH_TXPlayer::setDataSource(int fd, int64_t offset, int64_t length) {
Mutex::Autolock autoLock(mLock);
reset_l();
sp<DataSource> dataSource = new FileSource(dup(fd), offset, length);
status_t err = dataSource->initCheck();
if (err != OK) {
return err;
}
mFileSource = dataSource;
sp<MediaExtractor> extractor = MediaExtractor::Create(dataSource);
if (extractor == NULL) {
return UNKNOWN_ERROR;
}
return setDataSource_l(extractor);
}
status_t AAH_TXPlayer::setVideoSurface(const sp<Surface>& surface) {
return OK;
}
status_t AAH_TXPlayer::setVideoSurfaceTexture(
const sp<ISurfaceTexture>& surfaceTexture) {
return OK;
}
status_t AAH_TXPlayer::prepare() {
return INVALID_OPERATION;
}
status_t AAH_TXPlayer::prepareAsync() {
Mutex::Autolock autoLock(mLock);
return prepareAsync_l();
}
status_t AAH_TXPlayer::prepareAsync_l() {
if (mFlags & PREPARING) {
return UNKNOWN_ERROR; // async prepare already pending
}
mAAH_Sender = AAH_TXSender::GetInstance();
if (mAAH_Sender == NULL) {
return NO_MEMORY;
}
if (!mQueueStarted) {
mQueue.start();
mQueueStarted = true;
}
mFlags |= PREPARING;
mAsyncPrepareEvent = new AAH_TXEvent(
this, &AAH_TXPlayer::onPrepareAsyncEvent);
mQueue.postEvent(mAsyncPrepareEvent);
return OK;
}
status_t AAH_TXPlayer::finishSetDataSource_l() {
sp<DataSource> dataSource;
if (!strncasecmp("http://", mUri.string(), 7) ||
!strncasecmp("https://", mUri.string(), 8)) {
mConnectingDataSource = HTTPBase::Create(
(mFlags & INCOGNITO)
? HTTPBase::kFlagIncognito
: 0);
mLock.unlock();
status_t err = mConnectingDataSource->connect(mUri, &mUriHeaders);
mLock.lock();
if (err != OK) {
mConnectingDataSource.clear();
ALOGI("mConnectingDataSource->connect() returned %d", err);
return err;
}
mCachedSource = new NuCachedSource2(mConnectingDataSource);
mConnectingDataSource.clear();
dataSource = mCachedSource;
// We're going to prefill the cache before trying to instantiate
// the extractor below, as the latter is an operation that otherwise
// could block on the datasource for a significant amount of time.
// During that time we'd be unable to abort the preparation phase
// without this prefill.
mLock.unlock();
for (;;) {
status_t finalStatus;
size_t cachedDataRemaining =
mCachedSource->approxDataRemaining(&finalStatus);
if (finalStatus != OK ||
cachedDataRemaining >= kHighWaterMarkBytes ||
(mFlags & PREPARE_CANCELLED)) {
break;
}
usleep(200000);
}
mLock.lock();
if (mFlags & PREPARE_CANCELLED) {
ALOGI("Prepare cancelled while waiting for initial cache fill.");
return UNKNOWN_ERROR;
}
} else {
dataSource = DataSource::CreateFromURI(mUri.string(), &mUriHeaders);
}
if (dataSource == NULL) {
return UNKNOWN_ERROR;
}
sp<MediaExtractor> extractor = MediaExtractor::Create(dataSource);
if (extractor == NULL) {
return UNKNOWN_ERROR;
}
return setDataSource_l(extractor);
}
status_t AAH_TXPlayer::setDataSource_l(const sp<MediaExtractor> &extractor) {
// Attempt to approximate overall stream bitrate by summing all
// tracks' individual bitrates, if not all of them advertise bitrate,
// we have to fail.
int64_t totalBitRate = 0;
for (size_t i = 0; i < extractor->countTracks(); ++i) {
sp<MetaData> meta = extractor->getTrackMetaData(i);
int32_t bitrate;
if (!meta->findInt32(kKeyBitRate, &bitrate)) {
totalBitRate = -1;
break;
}
totalBitRate += bitrate;
}
mBitrate = totalBitRate;
ALOGV("mBitrate = %lld bits/sec", mBitrate);
bool haveAudio = false;
for (size_t i = 0; i < extractor->countTracks(); ++i) {
sp<MetaData> meta = extractor->getTrackMetaData(i);
const char *mime;
CHECK(meta->findCString(kKeyMIMEType, &mime));
if (!strncasecmp(mime, "audio/", 6)) {
mAudioSource = extractor->getTrack(i);
CHECK(mAudioSource != NULL);
haveAudio = true;
break;
}
}
if (!haveAudio) {
return UNKNOWN_ERROR;
}
mExtractorFlags = extractor->flags();
return OK;
}
void AAH_TXPlayer::abortPrepare(status_t err) {
CHECK(err != OK);
notifyListener_l(MEDIA_ERROR, MEDIA_ERROR_UNKNOWN, err);
mPrepareResult = err;
mFlags &= ~(PREPARING|PREPARE_CANCELLED|PREPARING_CONNECTED);
mPreparedCondition.broadcast();
}
void AAH_TXPlayer::onPrepareAsyncEvent() {
Mutex::Autolock autoLock(mLock);
if (mFlags & PREPARE_CANCELLED) {
ALOGI("prepare was cancelled before doing anything");
abortPrepare(UNKNOWN_ERROR);
return;
}
if (mUri.size() > 0) {
status_t err = finishSetDataSource_l();
if (err != OK) {
abortPrepare(err);
return;
}
}
mAudioFormat = mAudioSource->getFormat();
if (!mAudioFormat->findInt64(kKeyDuration, &mDurationUs))
mDurationUs = 1;
const char* mime_type = NULL;
if (!mAudioFormat->findCString(kKeyMIMEType, &mime_type)) {
ALOGE("Failed to find audio substream MIME type during prepare.");
abortPrepare(BAD_VALUE);
return;
}
if (!strcmp(mime_type, MEDIA_MIMETYPE_AUDIO_MPEG)) {
mAudioCodec = TRTPAudioPacket::kCodecMPEG1Audio;
} else
if (!strcmp(mime_type, MEDIA_MIMETYPE_AUDIO_AAC)) {
mAudioCodec = TRTPAudioPacket::kCodecAACAudio;
uint32_t type;
int32_t sample_rate;
int32_t channel_count;
const void* esds_data;
size_t esds_len;
if (!mAudioFormat->findInt32(kKeySampleRate, &sample_rate)) {
ALOGE("Failed to find sample rate for AAC substream.");
abortPrepare(BAD_VALUE);
return;
}
if (!mAudioFormat->findInt32(kKeyChannelCount, &channel_count)) {
ALOGE("Failed to find channel count for AAC substream.");
abortPrepare(BAD_VALUE);
return;
}
if (!mAudioFormat->findData(kKeyESDS, &type, &esds_data, &esds_len)) {
ALOGE("Failed to find codec init data for AAC substream.");
abortPrepare(BAD_VALUE);
return;
}
CHECK(NULL == mAudioCodecData);
mAudioCodecDataSize = esds_len
+ sizeof(sample_rate)
+ sizeof(channel_count);
mAudioCodecData = new uint8_t[mAudioCodecDataSize];
if (NULL == mAudioCodecData) {
ALOGE("Failed to allocate %u bytes for AAC substream codec aux"
" data.", mAudioCodecDataSize);
mAudioCodecDataSize = 0;
abortPrepare(BAD_VALUE);
return;
}
uint8_t* tmp = mAudioCodecData;
tmp[0] = static_cast<uint8_t>((sample_rate >> 24) & 0xFF);
tmp[1] = static_cast<uint8_t>((sample_rate >> 16) & 0xFF);
tmp[2] = static_cast<uint8_t>((sample_rate >> 8) & 0xFF);
tmp[3] = static_cast<uint8_t>((sample_rate ) & 0xFF);
tmp[4] = static_cast<uint8_t>((channel_count >> 24) & 0xFF);
tmp[5] = static_cast<uint8_t>((channel_count >> 16) & 0xFF);
tmp[6] = static_cast<uint8_t>((channel_count >> 8) & 0xFF);
tmp[7] = static_cast<uint8_t>((channel_count ) & 0xFF);
memcpy(tmp + 8, esds_data, esds_len);
} else {
ALOGE("Unsupported MIME type \"%s\" in audio substream", mime_type);
abortPrepare(BAD_VALUE);
return;
}
status_t err = mAudioSource->start();
if (err != OK) {
ALOGI("failed to start audio source, err=%d", err);
abortPrepare(err);
return;
}
mFlags |= PREPARING_CONNECTED;
if (mCachedSource != NULL) {
postBufferingEvent_l();
} else {
finishAsyncPrepare_l();
}
}
void AAH_TXPlayer::finishAsyncPrepare_l() {
notifyListener_l(MEDIA_PREPARED);
mPrepareResult = OK;
mFlags &= ~(PREPARING|PREPARE_CANCELLED|PREPARING_CONNECTED);
mFlags |= PREPARED;
mPreparedCondition.broadcast();
}
status_t AAH_TXPlayer::start() {
Mutex::Autolock autoLock(mLock);
mFlags &= ~CACHE_UNDERRUN;
return play_l();
}
status_t AAH_TXPlayer::play_l() {
if (mFlags & PLAYING) {
return OK;
}
if (!(mFlags & PREPARED)) {
return INVALID_OPERATION;
}
{
Mutex::Autolock lock(mEndpointLock);
if (!mEndpointValid) {
return INVALID_OPERATION;
}
if (!mEndpointRegistered) {
mProgramID = mAAH_Sender->registerEndpoint(mEndpoint);
mEndpointRegistered = true;
}
}
mFlags |= PLAYING;
updateClockTransform_l(false);
postPumpAudioEvent_l(-1);
return OK;
}
status_t AAH_TXPlayer::stop() {
status_t ret = pause();
sendEOS_l();
return ret;
}
status_t AAH_TXPlayer::pause() {
Mutex::Autolock autoLock(mLock);
mFlags &= ~CACHE_UNDERRUN;
return pause_l();
}
status_t AAH_TXPlayer::pause_l(bool doClockUpdate) {
if (!(mFlags & PLAYING)) {
return OK;
}
cancelPlayerEvents(true /* keepBufferingGoing */);
mFlags &= ~PLAYING;
if (doClockUpdate) {
updateClockTransform_l(true);
}
return OK;
}
void AAH_TXPlayer::updateClockTransform_l(bool pause) {
// record the new pause status so that onPumpAudio knows what rate to apply
// when it initializes the transform
mPlayRateIsPaused = pause;
// if we haven't yet established a valid clock transform, then we can't
// do anything here
if (!mCurrentClockTransformValid) {
return;
}
// sample the current common time
int64_t commonTimeNow;
if (OK != mCCHelper.getCommonTime(&commonTimeNow)) {
ALOGE("updateClockTransform_l get common time failed");
mCurrentClockTransformValid = false;
return;
}
// convert the current common time to media time using the old
// transform
int64_t mediaTimeNow;
if (!mCurrentClockTransform.doReverseTransform(
commonTimeNow, &mediaTimeNow)) {
ALOGE("updateClockTransform_l reverse transform failed");
mCurrentClockTransformValid = false;
return;
}
// calculate a new transform that preserves the old transform's
// result for the current time
mCurrentClockTransform.a_zero = mediaTimeNow;
mCurrentClockTransform.b_zero = commonTimeNow;
mCurrentClockTransform.a_to_b_numer = 1;
mCurrentClockTransform.a_to_b_denom = pause ? 0 : 1;
// send a packet announcing the new transform
sp<TRTPControlPacket> packet = new TRTPControlPacket();
packet->setClockTransform(mCurrentClockTransform);
packet->setCommandID(TRTPControlPacket::kCommandNop);
queuePacketToSender_l(packet);
}
void AAH_TXPlayer::sendEOS_l() {
sp<TRTPControlPacket> packet = new TRTPControlPacket();
packet->setCommandID(TRTPControlPacket::kCommandEOS);
queuePacketToSender_l(packet);
}
bool AAH_TXPlayer::isPlaying() {
return (mFlags & PLAYING) || (mFlags & CACHE_UNDERRUN);
}
status_t AAH_TXPlayer::seekTo(int msec) {
if (mExtractorFlags & MediaExtractor::CAN_SEEK) {
Mutex::Autolock autoLock(mLock);
return seekTo_l(static_cast<int64_t>(msec) * 1000);
}
notifyListener_l(MEDIA_SEEK_COMPLETE);
return OK;
}
status_t AAH_TXPlayer::seekTo_l(int64_t timeUs) {
mIsSeeking = true;
mSeekTimeUs = timeUs;
mCurrentClockTransformValid = false;
mLastQueuedMediaTimePTSValid = false;
// send a flush command packet
sp<TRTPControlPacket> packet = new TRTPControlPacket();
packet->setCommandID(TRTPControlPacket::kCommandFlush);
queuePacketToSender_l(packet);
return OK;
}
status_t AAH_TXPlayer::getCurrentPosition(int *msec) {
if (!msec) {
return BAD_VALUE;
}
Mutex::Autolock lock(mLock);
int position;
if (mIsSeeking) {
position = mSeekTimeUs / 1000;
} else if (mCurrentClockTransformValid) {
// sample the current common time
int64_t commonTimeNow;
if (OK != mCCHelper.getCommonTime(&commonTimeNow)) {
ALOGE("getCurrentPosition get common time failed");
return INVALID_OPERATION;
}
int64_t mediaTimeNow;
if (!mCurrentClockTransform.doReverseTransform(commonTimeNow,
&mediaTimeNow)) {
ALOGE("getCurrentPosition reverse transform failed");
return INVALID_OPERATION;
}
position = static_cast<int>(mediaTimeNow / 1000);
} else {
position = 0;
}
int duration;
if (getDuration_l(&duration) == OK) {
*msec = clamp(position, 0, duration);
} else {
*msec = (position >= 0) ? position : 0;
}
return OK;
}
status_t AAH_TXPlayer::getDuration(int* msec) {
if (!msec) {
return BAD_VALUE;
}
Mutex::Autolock lock(mLock);
return getDuration_l(msec);
}
status_t AAH_TXPlayer::getDuration_l(int* msec) {
if (mDurationUs < 0) {
return UNKNOWN_ERROR;
}
*msec = (mDurationUs + 500) / 1000;
return OK;
}
status_t AAH_TXPlayer::reset() {
Mutex::Autolock autoLock(mLock);
reset_l();
return OK;
}
void AAH_TXPlayer::reset_l() {
if (mFlags & PREPARING) {
mFlags |= PREPARE_CANCELLED;
if (mConnectingDataSource != NULL) {
ALOGI("interrupting the connection process");
mConnectingDataSource->disconnect();
}
if (mFlags & PREPARING_CONNECTED) {
// We are basically done preparing, we're just buffering
// enough data to start playback, we can safely interrupt that.
finishAsyncPrepare_l();
}
}
while (mFlags & PREPARING) {
mPreparedCondition.wait(mLock);
}
cancelPlayerEvents();
sendEOS_l();
mCachedSource.clear();
if (mAudioSource != NULL) {
mAudioSource->stop();
}
mAudioSource.clear();
mAudioCodec = TRTPAudioPacket::kCodecInvalid;
mAudioFormat = NULL;
delete[] mAudioCodecData;
mAudioCodecData = NULL;
mAudioCodecDataSize = 0;
mFlags = 0;
mExtractorFlags = 0;
mDurationUs = -1;
mIsSeeking = false;
mSeekTimeUs = 0;
mUri.setTo("");
mUriHeaders.clear();
mFileSource.clear();
mBitrate = -1;
{
Mutex::Autolock lock(mEndpointLock);
if (mAAH_Sender != NULL && mEndpointRegistered) {
mAAH_Sender->unregisterEndpoint(mEndpoint);
}
mEndpointRegistered = false;
mEndpointValid = false;
}
mProgramID = 0;
mAAH_Sender.clear();
mLastQueuedMediaTimePTSValid = false;
mCurrentClockTransformValid = false;
mPlayRateIsPaused = false;
mTRTPVolume = 255;
}
status_t AAH_TXPlayer::setLooping(int loop) {
return OK;
}
player_type AAH_TXPlayer::playerType() {
return AAH_TX_PLAYER;
}
status_t AAH_TXPlayer::setParameter(int key, const Parcel &request) {
return ERROR_UNSUPPORTED;
}
status_t AAH_TXPlayer::getParameter(int key, Parcel *reply) {
return ERROR_UNSUPPORTED;
}
status_t AAH_TXPlayer::invoke(const Parcel& request, Parcel *reply) {
return INVALID_OPERATION;
}
status_t AAH_TXPlayer::getMetadata(const media::Metadata::Filter& ids,
Parcel* records) {
using media::Metadata;
Metadata metadata(records);
metadata.appendBool(Metadata::kPauseAvailable, true);
metadata.appendBool(Metadata::kSeekBackwardAvailable, false);
metadata.appendBool(Metadata::kSeekForwardAvailable, false);
metadata.appendBool(Metadata::kSeekAvailable, false);
return OK;
}
status_t AAH_TXPlayer::setVolume(float leftVolume, float rightVolume) {
if (leftVolume != rightVolume) {
ALOGE("%s does not support per channel volume: %f, %f",
__PRETTY_FUNCTION__, leftVolume, rightVolume);
}
float volume = clamp(leftVolume, 0.0f, 1.0f);
Mutex::Autolock lock(mLock);
mTRTPVolume = static_cast<uint8_t>((leftVolume * 255.0) + 0.5);
return OK;
}
status_t AAH_TXPlayer::setAudioStreamType(audio_stream_type_t streamType) {
return OK;
}
status_t AAH_TXPlayer::setRetransmitEndpoint(
const struct sockaddr_in* endpoint) {
Mutex::Autolock lock(mLock);
if (NULL == endpoint)
return BAD_VALUE;
// Once the endpoint has been registered, it may not be changed.
if (mEndpointRegistered)
return INVALID_OPERATION;
mEndpoint.addr = endpoint->sin_addr.s_addr;
mEndpoint.port = endpoint->sin_port;
mEndpointValid = true;
return OK;
}
void AAH_TXPlayer::notifyListener_l(int msg, int ext1, int ext2) {
sendEvent(msg, ext1, ext2);
}
bool AAH_TXPlayer::getBitrate_l(int64_t *bitrate) {
off64_t size;
if (mDurationUs >= 0 &&
mCachedSource != NULL &&
mCachedSource->getSize(&size) == OK) {
*bitrate = size * 8000000ll / mDurationUs; // in bits/sec
return true;
}
if (mBitrate >= 0) {
*bitrate = mBitrate;
return true;
}
*bitrate = 0;
return false;
}
// Returns true iff cached duration is available/applicable.
bool AAH_TXPlayer::getCachedDuration_l(int64_t *durationUs, bool *eos) {
int64_t bitrate;
if (mCachedSource != NULL && getBitrate_l(&bitrate)) {
status_t finalStatus;
size_t cachedDataRemaining = mCachedSource->approxDataRemaining(
&finalStatus);
*durationUs = cachedDataRemaining * 8000000ll / bitrate;
*eos = (finalStatus != OK);
return true;
}
return false;
}
void AAH_TXPlayer::ensureCacheIsFetching_l() {
if (mCachedSource != NULL) {
mCachedSource->resumeFetchingIfNecessary();
}
}
void AAH_TXPlayer::postBufferingEvent_l() {
if (mBufferingEventPending) {
return;
}
mBufferingEventPending = true;
mQueue.postEventWithDelay(mBufferingEvent, 1000000ll);
}
void AAH_TXPlayer::postPumpAudioEvent_l(int64_t delayUs) {
if (mPumpAudioEventPending) {
return;
}
mPumpAudioEventPending = true;
mQueue.postEventWithDelay(mPumpAudioEvent, delayUs < 0 ? 10000 : delayUs);
}
void AAH_TXPlayer::onBufferingUpdate() {
Mutex::Autolock autoLock(mLock);
if (!mBufferingEventPending) {
return;
}
mBufferingEventPending = false;
if (mCachedSource != NULL) {
status_t finalStatus;
size_t cachedDataRemaining = mCachedSource->approxDataRemaining(
&finalStatus);
bool eos = (finalStatus != OK);
if (eos) {
if (finalStatus == ERROR_END_OF_STREAM) {
notifyListener_l(MEDIA_BUFFERING_UPDATE, 100);
}
if (mFlags & PREPARING) {
ALOGV("cache has reached EOS, prepare is done.");
finishAsyncPrepare_l();
}
} else {
int64_t bitrate;
if (getBitrate_l(&bitrate)) {
size_t cachedSize = mCachedSource->cachedSize();
int64_t cachedDurationUs = cachedSize * 8000000ll / bitrate;
int percentage = (100.0 * (double) cachedDurationUs)
/ mDurationUs;
if (percentage > 100) {
percentage = 100;
}
notifyListener_l(MEDIA_BUFFERING_UPDATE, percentage);
} else {
// We don't know the bitrate of the stream, use absolute size
// limits to maintain the cache.
if ((mFlags & PLAYING) &&
!eos &&
(cachedDataRemaining < kLowWaterMarkBytes)) {
ALOGI("cache is running low (< %d) , pausing.",
kLowWaterMarkBytes);
mFlags |= CACHE_UNDERRUN;
pause_l();
ensureCacheIsFetching_l();
notifyListener_l(MEDIA_INFO, MEDIA_INFO_BUFFERING_START);
} else if (eos || cachedDataRemaining > kHighWaterMarkBytes) {
if (mFlags & CACHE_UNDERRUN) {
ALOGI("cache has filled up (> %d), resuming.",
kHighWaterMarkBytes);
mFlags &= ~CACHE_UNDERRUN;
play_l();
notifyListener_l(MEDIA_INFO, MEDIA_INFO_BUFFERING_END);
} else if (mFlags & PREPARING) {
ALOGV("cache has filled up (> %d), prepare is done",
kHighWaterMarkBytes);
finishAsyncPrepare_l();
}
}
}
}
}
int64_t cachedDurationUs;
bool eos;
if (getCachedDuration_l(&cachedDurationUs, &eos)) {
ALOGV("cachedDurationUs = %.2f secs, eos=%d",
cachedDurationUs / 1E6, eos);
if ((mFlags & PLAYING) &&
!eos &&
(cachedDurationUs < kLowWaterMarkUs)) {
ALOGI("cache is running low (%.2f secs) , pausing.",
cachedDurationUs / 1E6);
mFlags |= CACHE_UNDERRUN;
pause_l();
ensureCacheIsFetching_l();
notifyListener_l(MEDIA_INFO, MEDIA_INFO_BUFFERING_START);
} else if (eos || cachedDurationUs > kHighWaterMarkUs) {
if (mFlags & CACHE_UNDERRUN) {
ALOGI("cache has filled up (%.2f secs), resuming.",
cachedDurationUs / 1E6);
mFlags &= ~CACHE_UNDERRUN;
play_l();
notifyListener_l(MEDIA_INFO, MEDIA_INFO_BUFFERING_END);
} else if (mFlags & PREPARING) {
ALOGV("cache has filled up (%.2f secs), prepare is done",
cachedDurationUs / 1E6);
finishAsyncPrepare_l();
}
}
}
postBufferingEvent_l();
}
void AAH_TXPlayer::onPumpAudio() {
while (true) {
Mutex::Autolock autoLock(mLock);
// If this flag is clear, its because someone has externally canceled
// this pump operation (probably because we a resetting/shutting down).
// Get out immediately, do not reschedule ourselves.
if (!mPumpAudioEventPending) {
return;
}
// Start by checking if there is still work to be doing. If we have
// never queued a payload (so we don't know what the last queued PTS is)
// or we have never established a MediaTime->CommonTime transformation,
// then we have work to do (one time through this loop should establish
// both). Otherwise, we want to keep a fixed amt of presentation time
// worth of data buffered. If we cannot get common time (service is
// unavailable, or common time is undefined)) then we don't have a lot
// of good options here. For now, signal an error up to the app level
// and shut down the transmission pump.
int64_t commonTimeNow;
if (OK != mCCHelper.getCommonTime(&commonTimeNow)) {
// Failed to get common time; either the service is down or common
// time is not synced. Raise an error and shutdown the player.
ALOGE("*** Cannot pump audio, unable to fetch common time."
" Shutting down.");
notifyListener_l(MEDIA_ERROR, MEDIA_ERROR_UNKNOWN, UNKNOWN_ERROR);
mPumpAudioEventPending = false;
break;
}
if (mCurrentClockTransformValid && mLastQueuedMediaTimePTSValid) {
int64_t mediaTimeNow;
bool conversionResult = mCurrentClockTransform.doReverseTransform(
commonTimeNow,
&mediaTimeNow);
CHECK(conversionResult);
if ((mediaTimeNow +
kAAHBufferTimeUs -
mLastQueuedMediaTimePTS) <= 0) {
break;
}
}
MediaSource::ReadOptions options;
if (mIsSeeking) {
options.setSeekTo(mSeekTimeUs);
}
MediaBuffer* mediaBuffer;
status_t err = mAudioSource->read(&mediaBuffer, &options);
if (err != NO_ERROR) {
if (err == ERROR_END_OF_STREAM) {
ALOGI("*** %s reached end of stream", __PRETTY_FUNCTION__);
notifyListener_l(MEDIA_BUFFERING_UPDATE, 100);
notifyListener_l(MEDIA_PLAYBACK_COMPLETE);
pause_l(false);
sendEOS_l();
} else {
ALOGE("*** %s read failed err=%d", __PRETTY_FUNCTION__, err);
}
return;
}
if (mIsSeeking) {
mIsSeeking = false;
notifyListener_l(MEDIA_SEEK_COMPLETE);
}
uint8_t* data = (static_cast<uint8_t*>(mediaBuffer->data()) +
mediaBuffer->range_offset());
ALOGV("*** %s got media buffer data=[%02hhx %02hhx %02hhx %02hhx]"
" offset=%d length=%d", __PRETTY_FUNCTION__,
data[0], data[1], data[2], data[3],
mediaBuffer->range_offset(), mediaBuffer->range_length());
int64_t mediaTimeUs;
CHECK(mediaBuffer->meta_data()->findInt64(kKeyTime, &mediaTimeUs));
ALOGV("*** timeUs=%lld", mediaTimeUs);
if (!mCurrentClockTransformValid) {
if (OK == mCCHelper.getCommonTime(&commonTimeNow)) {
mCurrentClockTransform.a_zero = mediaTimeUs;
mCurrentClockTransform.b_zero = commonTimeNow +
kAAHStartupLeadTimeUs;
mCurrentClockTransform.a_to_b_numer = 1;
mCurrentClockTransform.a_to_b_denom = mPlayRateIsPaused ? 0 : 1;
mCurrentClockTransformValid = true;
} else {
// Failed to get common time; either the service is down or
// common time is not synced. Raise an error and shutdown the
// player.
ALOGE("*** Cannot begin transmission, unable to fetch common"
" time. Dropping sample with pts=%lld", mediaTimeUs);
notifyListener_l(MEDIA_ERROR,
MEDIA_ERROR_UNKNOWN,
UNKNOWN_ERROR);
mPumpAudioEventPending = false;
break;
}
}
ALOGV("*** transmitting packet with pts=%lld", mediaTimeUs);
sp<TRTPAudioPacket> packet = new TRTPAudioPacket();
packet->setPTS(mediaTimeUs);
packet->setSubstreamID(1);
packet->setCodecType(mAudioCodec);
packet->setVolume(mTRTPVolume);
// TODO : introduce a throttle for this so we can control the
// frequency with which transforms get sent.
packet->setClockTransform(mCurrentClockTransform);
packet->setAccessUnitData(data, mediaBuffer->range_length());
// TODO : while its pretty much universally true that audio ES payloads
// are all RAPs across all codecs, it might be a good idea to throttle
// the frequency with which we send codec out of band data to the RXers.
// If/when we do, we need to flag only those payloads which have
// required out of band data attached to them as RAPs.
packet->setRandomAccessPoint(true);
if (mAudioCodecData && mAudioCodecDataSize) {
packet->setAuxData(mAudioCodecData, mAudioCodecDataSize);
}
queuePacketToSender_l(packet);
mediaBuffer->release();
mLastQueuedMediaTimePTSValid = true;
mLastQueuedMediaTimePTS = mediaTimeUs;
}
{ // Explicit scope for the autolock pattern.
Mutex::Autolock autoLock(mLock);
// If someone externally has cleared this flag, its because we should be
// shutting down. Do not reschedule ourselves.
if (!mPumpAudioEventPending) {
return;
}
// Looks like no one canceled us explicitly. Clear our flag and post a
// new event to ourselves.
mPumpAudioEventPending = false;
postPumpAudioEvent_l(10000);
}
}
void AAH_TXPlayer::queuePacketToSender_l(const sp<TRTPPacket>& packet) {
if (mAAH_Sender == NULL) {
return;
}
sp<AMessage> message = new AMessage(AAH_TXSender::kWhatSendPacket,
mAAH_Sender->handlerID());
{
Mutex::Autolock lock(mEndpointLock);
if (!mEndpointValid) {
return;
}
message->setInt32(AAH_TXSender::kSendPacketIPAddr, mEndpoint.addr);
message->setInt32(AAH_TXSender::kSendPacketPort, mEndpoint.port);
}
packet->setProgramID(mProgramID);
packet->setExpireTime(systemTime() + kAAHRetryKeepAroundTimeNs);
packet->pack();
message->setObject(AAH_TXSender::kSendPacketTRTPPacket, packet);
message->post();
}
} // namespace android