John Grossman 720aa28279 Switch the way we configure for MediaPlayer retransmission.
This is a cherry-pick of I6ab07d89b2eeb0650e634b8c3b7a0b36aba4e7dd
with merge conflicts addressed by hand and additional changes made in
response to code review feedback.

Move in the direction of a more publishable API for configuring a
media player for retransmission.  It used to be that we used a custom
invoke and a modified URL (prefixed with aahTX://).  There are many
issues with this technique and it was never meant to stand the test of
time.

This CL gets rid of all that.  A new (but currently hidden) method was
introduced to the java level MediaPlayer API, called
setRetransmitTarget(InetSocketAddress), which allows an app writer to
set the retransmit target.  For now, this method needs to be called
before a call to setDataSource (which is pretty unusual for the
MediaPlayer API) because this mid level code uses this as a cue to
instantiate an aahTX player instead of relying on the data source to
select a player.  When retranmit functionality becomes part of the
existing android player implemenation, this
set-retrans-before-set-data-source behavior can go away, along with
the aahTX player itself.

Change-Id: I3b46c5227bbf69acb2f3cc4f93cfccad9777be98
Signed-off-by: John Grossman <johngro@google.com>
2012-03-01 14:41:35 -08:00

604 lines
18 KiB
C++

/*
* Copyright (C) 2011 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define LOG_TAG "LibAAH_RTP"
#include <media/stagefright/foundation/ADebug.h>
#include <netinet/in.h>
#include <poll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <media/stagefright/foundation/AMessage.h>
#include <utils/misc.h>
#include "aah_tx_player.h"
#include "aah_tx_sender.h"
namespace android {
const char* AAH_TXSender::kSendPacketIPAddr = "ipaddr";
const char* AAH_TXSender::kSendPacketPort = "port";
const char* AAH_TXSender::kSendPacketTRTPPacket = "trtp";
const int AAH_TXSender::kRetryTrimIntervalUs = 100000;
const int AAH_TXSender::kHeartbeatIntervalUs = 1000000;
const int AAH_TXSender::kRetryBufferCapacity = 100;
const nsecs_t AAH_TXSender::kHeartbeatTimeout = 600ull * 1000000000ull;
Mutex AAH_TXSender::sLock;
wp<AAH_TXSender> AAH_TXSender::sInstance;
uint32_t AAH_TXSender::sNextEpoch;
bool AAH_TXSender::sNextEpochValid = false;
AAH_TXSender::AAH_TXSender() : mSocket(-1) {
mLastSentPacketTime = systemTime();
}
sp<AAH_TXSender> AAH_TXSender::GetInstance() {
Mutex::Autolock autoLock(sLock);
sp<AAH_TXSender> sender = sInstance.promote();
if (sender == NULL) {
sender = new AAH_TXSender();
if (sender == NULL) {
return NULL;
}
sender->mLooper = new ALooper();
if (sender->mLooper == NULL) {
return NULL;
}
sender->mReflector = new AHandlerReflector<AAH_TXSender>(sender.get());
if (sender->mReflector == NULL) {
return NULL;
}
sender->mSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (sender->mSocket == -1) {
ALOGW("%s unable to create socket", __PRETTY_FUNCTION__);
return NULL;
}
struct sockaddr_in bind_addr;
memset(&bind_addr, 0, sizeof(bind_addr));
bind_addr.sin_family = AF_INET;
if (bind(sender->mSocket,
reinterpret_cast<const sockaddr*>(&bind_addr),
sizeof(bind_addr)) < 0) {
ALOGW("%s unable to bind socket (errno %d)",
__PRETTY_FUNCTION__, errno);
return NULL;
}
sender->mRetryReceiver = new RetryReceiver(sender.get());
if (sender->mRetryReceiver == NULL) {
return NULL;
}
sender->mLooper->setName("AAH_TXSender");
sender->mLooper->registerHandler(sender->mReflector);
sender->mLooper->start(false, false, PRIORITY_AUDIO);
if (sender->mRetryReceiver->run("AAH_TXSenderRetry", PRIORITY_AUDIO)
!= OK) {
ALOGW("%s unable to start retry thread", __PRETTY_FUNCTION__);
return NULL;
}
sInstance = sender;
}
return sender;
}
AAH_TXSender::~AAH_TXSender() {
mLooper->stop();
mLooper->unregisterHandler(mReflector->id());
if (mRetryReceiver != NULL) {
mRetryReceiver->requestExit();
mRetryReceiver->mWakeupEvent.setEvent();
if (mRetryReceiver->requestExitAndWait() != OK) {
ALOGW("%s shutdown of retry receiver failed", __PRETTY_FUNCTION__);
}
mRetryReceiver->mSender = NULL;
mRetryReceiver.clear();
}
if (mSocket != -1) {
close(mSocket);
}
}
// Return the next epoch number usable for a newly instantiated endpoint.
uint32_t AAH_TXSender::getNextEpoch() {
Mutex::Autolock autoLock(sLock);
if (sNextEpochValid) {
sNextEpoch = (sNextEpoch + 1) & TRTPPacket::kTRTPEpochMask;
} else {
sNextEpoch = ns2ms(systemTime()) & TRTPPacket::kTRTPEpochMask;
sNextEpochValid = true;
}
return sNextEpoch;
}
// Notify the sender that a player has started sending to this endpoint.
// Returns a program ID for use by the calling player.
uint16_t AAH_TXSender::registerEndpoint(const Endpoint& endpoint) {
Mutex::Autolock lock(mEndpointLock);
EndpointState* eps = mEndpointMap.valueFor(endpoint);
if (eps) {
eps->playerRefCount++;
} else {
eps = new EndpointState(getNextEpoch());
mEndpointMap.add(endpoint, eps);
}
// if this is the first registered endpoint, then send a message to start
// trimming retry buffers and a message to start sending heartbeats.
if (mEndpointMap.size() == 1) {
sp<AMessage> trimMessage = new AMessage(kWhatTrimRetryBuffers,
handlerID());
trimMessage->post(kRetryTrimIntervalUs);
sp<AMessage> heartbeatMessage = new AMessage(kWhatSendHeartbeats,
handlerID());
heartbeatMessage->post(kHeartbeatIntervalUs);
}
eps->nextProgramID++;
return eps->nextProgramID;
}
// Notify the sender that a player has ceased sending to this endpoint.
// An endpoint's state can not be deleted until all of the endpoint's
// registered players have called unregisterEndpoint.
void AAH_TXSender::unregisterEndpoint(const Endpoint& endpoint) {
Mutex::Autolock lock(mEndpointLock);
EndpointState* eps = mEndpointMap.valueFor(endpoint);
if (eps) {
eps->playerRefCount--;
CHECK(eps->playerRefCount >= 0);
}
}
void AAH_TXSender::onMessageReceived(const sp<AMessage>& msg) {
switch (msg->what()) {
case kWhatSendPacket:
onSendPacket(msg);
break;
case kWhatTrimRetryBuffers:
trimRetryBuffers();
break;
case kWhatSendHeartbeats:
sendHeartbeats();
break;
default:
TRESPASS();
break;
}
}
void AAH_TXSender::onSendPacket(const sp<AMessage>& msg) {
sp<RefBase> obj;
CHECK(msg->findObject(kSendPacketTRTPPacket, &obj));
sp<TRTPPacket> packet = static_cast<TRTPPacket*>(obj.get());
uint32_t ipAddr;
CHECK(msg->findInt32(kSendPacketIPAddr,
reinterpret_cast<int32_t*>(&ipAddr)));
int32_t port32;
CHECK(msg->findInt32(kSendPacketPort, &port32));
uint16_t port = port32;
Mutex::Autolock lock(mEndpointLock);
doSendPacket_l(packet, Endpoint(ipAddr, port));
mLastSentPacketTime = systemTime();
}
void AAH_TXSender::doSendPacket_l(const sp<TRTPPacket>& packet,
const Endpoint& endpoint) {
EndpointState* eps = mEndpointMap.valueFor(endpoint);
if (!eps) {
// the endpoint state has disappeared, so the player that sent this
// packet must be dead.
return;
}
// assign the packet's sequence number
packet->setEpoch(eps->epoch);
packet->setSeqNumber(eps->trtpSeqNumber++);
// add the packet to the retry buffer
RetryBuffer& retry = eps->retry;
retry.push_back(packet);
// send the packet
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = endpoint.addr;
addr.sin_port = endpoint.port;
ssize_t result = sendto(mSocket,
packet->getPacket(),
packet->getPacketLen(),
0,
(const struct sockaddr *) &addr,
sizeof(addr));
if (result == -1) {
ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
}
}
void AAH_TXSender::trimRetryBuffers() {
Mutex::Autolock lock(mEndpointLock);
nsecs_t localTimeNow = systemTime();
Vector<Endpoint> endpointsToRemove;
for (size_t i = 0; i < mEndpointMap.size(); i++) {
EndpointState* eps = mEndpointMap.editValueAt(i);
RetryBuffer& retry = eps->retry;
while (!retry.isEmpty()) {
if (retry[0]->getExpireTime() < localTimeNow) {
retry.pop_front();
} else {
break;
}
}
if (retry.isEmpty() && eps->playerRefCount == 0) {
endpointsToRemove.add(mEndpointMap.keyAt(i));
}
}
// remove the state for any endpoints that are no longer in use
for (size_t i = 0; i < endpointsToRemove.size(); i++) {
Endpoint& e = endpointsToRemove.editItemAt(i);
ALOGD("*** %s removing endpoint addr=%08x",
__PRETTY_FUNCTION__, e.addr);
size_t index = mEndpointMap.indexOfKey(e);
delete mEndpointMap.valueAt(index);
mEndpointMap.removeItemsAt(index);
}
// schedule the next trim
if (mEndpointMap.size()) {
sp<AMessage> trimMessage = new AMessage(kWhatTrimRetryBuffers,
handlerID());
trimMessage->post(kRetryTrimIntervalUs);
}
}
void AAH_TXSender::sendHeartbeats() {
Mutex::Autolock lock(mEndpointLock);
if (shouldSendHeartbeats_l()) {
for (size_t i = 0; i < mEndpointMap.size(); i++) {
EndpointState* eps = mEndpointMap.editValueAt(i);
const Endpoint& ep = mEndpointMap.keyAt(i);
sp<TRTPControlPacket> packet = new TRTPControlPacket();
packet->setCommandID(TRTPControlPacket::kCommandNop);
packet->setExpireTime(systemTime() +
AAH_TXPlayer::kAAHRetryKeepAroundTimeNs);
packet->pack();
doSendPacket_l(packet, ep);
}
}
// schedule the next heartbeat
if (mEndpointMap.size()) {
sp<AMessage> heartbeatMessage = new AMessage(kWhatSendHeartbeats,
handlerID());
heartbeatMessage->post(kHeartbeatIntervalUs);
}
}
bool AAH_TXSender::shouldSendHeartbeats_l() {
// assert(holding endpoint lock)
return (systemTime() < (mLastSentPacketTime + kHeartbeatTimeout));
}
// Receiver
// initial 4-byte ID of a retry request packet
const uint32_t AAH_TXSender::RetryReceiver::kRetryRequestID = 'Treq';
// initial 4-byte ID of a retry NAK packet
const uint32_t AAH_TXSender::RetryReceiver::kRetryNakID = 'Tnak';
// initial 4-byte ID of a fast start request packet
const uint32_t AAH_TXSender::RetryReceiver::kFastStartRequestID = 'Tfst';
AAH_TXSender::RetryReceiver::RetryReceiver(AAH_TXSender* sender)
: Thread(false),
mSender(sender) {}
AAH_TXSender::RetryReceiver::~RetryReceiver() {
mWakeupEvent.clearPendingEvents();
}
// Returns true if val is within the interval bounded inclusively by
// start and end. Also handles the case where there is a rollover of the
// range between start and end.
template <typename T>
static inline bool withinIntervalWithRollover(T val, T start, T end) {
return ((start <= end && val >= start && val <= end) ||
(start > end && (val >= start || val <= end)));
}
bool AAH_TXSender::RetryReceiver::threadLoop() {
struct pollfd pollFds[2];
pollFds[0].fd = mSender->mSocket;
pollFds[0].events = POLLIN;
pollFds[0].revents = 0;
pollFds[1].fd = mWakeupEvent.getWakeupHandle();
pollFds[1].events = POLLIN;
pollFds[1].revents = 0;
int pollResult = poll(pollFds, NELEM(pollFds), -1);
if (pollResult == -1) {
ALOGE("%s poll failed", __PRETTY_FUNCTION__);
return false;
}
if (exitPending()) {
ALOGI("*** %s exiting", __PRETTY_FUNCTION__);
return false;
}
if (pollFds[0].revents) {
handleRetryRequest();
}
return true;
}
void AAH_TXSender::RetryReceiver::handleRetryRequest() {
ALOGV("*** RX %s start", __PRETTY_FUNCTION__);
RetryPacket request;
struct sockaddr requestSrcAddr;
socklen_t requestSrcAddrLen = sizeof(requestSrcAddr);
ssize_t result = recvfrom(mSender->mSocket, &request, sizeof(request), 0,
&requestSrcAddr, &requestSrcAddrLen);
if (result == -1) {
ALOGE("%s recvfrom failed, errno=%d", __PRETTY_FUNCTION__, errno);
return;
}
if (static_cast<size_t>(result) < sizeof(RetryPacket)) {
ALOGW("%s short packet received", __PRETTY_FUNCTION__);
return;
}
uint32_t host_request_id = ntohl(request.id);
if ((host_request_id != kRetryRequestID) &&
(host_request_id != kFastStartRequestID)) {
ALOGW("%s received retry request with bogus ID (%08x)",
__PRETTY_FUNCTION__, host_request_id);
return;
}
Endpoint endpoint(request.endpointIP, request.endpointPort);
Mutex::Autolock lock(mSender->mEndpointLock);
EndpointState* eps = mSender->mEndpointMap.valueFor(endpoint);
if (eps == NULL || eps->retry.isEmpty()) {
// we have no retry buffer or an empty retry buffer for this endpoint,
// so NAK the entire request
RetryPacket nak = request;
nak.id = htonl(kRetryNakID);
result = sendto(mSender->mSocket, &nak, sizeof(nak), 0,
&requestSrcAddr, requestSrcAddrLen);
if (result == -1) {
ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
}
return;
}
RetryBuffer& retry = eps->retry;
uint16_t startSeq = ntohs(request.seqStart);
uint16_t endSeq = ntohs(request.seqEnd);
uint16_t retryFirstSeq = retry[0]->getSeqNumber();
uint16_t retryLastSeq = retry[retry.size() - 1]->getSeqNumber();
// If this is a fast start, then force the start of the retry to match the
// start of the retransmit ring buffer (unless the end of the retransmit
// ring buffer is already past the point of fast start)
if ((host_request_id == kFastStartRequestID) &&
!((startSeq - retryFirstSeq) & 0x8000)) {
startSeq = retryFirstSeq;
}
int startIndex;
if (withinIntervalWithRollover(startSeq, retryFirstSeq, retryLastSeq)) {
startIndex = static_cast<uint16_t>(startSeq - retryFirstSeq);
} else {
startIndex = -1;
}
int endIndex;
if (withinIntervalWithRollover(endSeq, retryFirstSeq, retryLastSeq)) {
endIndex = static_cast<uint16_t>(endSeq - retryFirstSeq);
} else {
endIndex = -1;
}
if (startIndex == -1 && endIndex == -1) {
// no part of the request range is found in the retry buffer
RetryPacket nak = request;
nak.id = htonl(kRetryNakID);
result = sendto(mSender->mSocket, &nak, sizeof(nak), 0,
&requestSrcAddr, requestSrcAddrLen);
if (result == -1) {
ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
}
return;
}
if (startIndex == -1) {
// NAK a subrange at the front of the request range
RetryPacket nak = request;
nak.id = htonl(kRetryNakID);
nak.seqEnd = htons(retryFirstSeq - 1);
result = sendto(mSender->mSocket, &nak, sizeof(nak), 0,
&requestSrcAddr, requestSrcAddrLen);
if (result == -1) {
ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
}
startIndex = 0;
} else if (endIndex == -1) {
// NAK a subrange at the back of the request range
RetryPacket nak = request;
nak.id = htonl(kRetryNakID);
nak.seqStart = htons(retryLastSeq + 1);
result = sendto(mSender->mSocket, &nak, sizeof(nak), 0,
&requestSrcAddr, requestSrcAddrLen);
if (result == -1) {
ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
}
endIndex = retry.size() - 1;
}
// send the retry packets
for (int i = startIndex; i <= endIndex; i++) {
const sp<TRTPPacket>& replyPacket = retry[i];
result = sendto(mSender->mSocket,
replyPacket->getPacket(),
replyPacket->getPacketLen(),
0,
&requestSrcAddr,
requestSrcAddrLen);
if (result == -1) {
ALOGW("%s sendto failed", __PRETTY_FUNCTION__);
}
}
}
// Endpoint
AAH_TXSender::Endpoint::Endpoint()
: addr(0)
, port(0) { }
AAH_TXSender::Endpoint::Endpoint(uint32_t a, uint16_t p)
: addr(a)
, port(p) {}
bool AAH_TXSender::Endpoint::operator<(const Endpoint& other) const {
return ((addr < other.addr) ||
(addr == other.addr && port < other.port));
}
// EndpointState
AAH_TXSender::EndpointState::EndpointState(uint32_t _epoch)
: retry(kRetryBufferCapacity)
, playerRefCount(1)
, trtpSeqNumber(0)
, nextProgramID(0)
, epoch(_epoch) { }
// CircularBuffer
template <typename T>
CircularBuffer<T>::CircularBuffer(size_t capacity)
: mCapacity(capacity)
, mHead(0)
, mTail(0)
, mFillCount(0) {
mBuffer = new T[capacity];
}
template <typename T>
CircularBuffer<T>::~CircularBuffer() {
delete [] mBuffer;
}
template <typename T>
void CircularBuffer<T>::push_back(const T& item) {
if (this->isFull()) {
this->pop_front();
}
mBuffer[mHead] = item;
mHead = (mHead + 1) % mCapacity;
mFillCount++;
}
template <typename T>
void CircularBuffer<T>::pop_front() {
CHECK(!isEmpty());
mBuffer[mTail] = T();
mTail = (mTail + 1) % mCapacity;
mFillCount--;
}
template <typename T>
size_t CircularBuffer<T>::size() const {
return mFillCount;
}
template <typename T>
bool CircularBuffer<T>::isFull() const {
return (mFillCount == mCapacity);
}
template <typename T>
bool CircularBuffer<T>::isEmpty() const {
return (mFillCount == 0);
}
template <typename T>
const T& CircularBuffer<T>::itemAt(size_t index) const {
CHECK(index < mFillCount);
return mBuffer[(mTail + index) % mCapacity];
}
template <typename T>
const T& CircularBuffer<T>::operator[](size_t index) const {
return itemAt(index);
}
} // namespace android