Compare commits

...

12 Commits

Author SHA1 Message Date
e17e8077d2 wip 2021-07-04 18:16:17 +02:00
40721126e2 buffering_thread 2021-07-04 17:29:50 +02:00
05c8ed1640 Add missing error log
Log video buffer initialization failure in v4l2_sink.
2021-07-04 17:29:50 +02:00
5caeb2f624 Notify new frames via callbacks
Currently, a frame is available to the consumer as soon as it is pushed
by the producer (which can detect if the previous frame is skipped).

Notify the new frames (and frame skipped) via callbacks instead.

This paves the way to add (optional) buffering, which will introduce a
delay between the time when the frame is produced and the time it is
available to be consumed.
2021-07-04 17:29:50 +02:00
e0fc1b80d2 Extract current video_buffer to frame_buffer
The current video buffer only stores one pending frame.

In order to add a new buffering feature, move this part to a separate
"frame buffer". Keep the video_buffer, which currently delegates all its
calls to the frame_buffer.
2021-07-04 17:29:50 +02:00
b9a7bd049d Rename video_buffer to sc_video_buffer
Add a scrcpy-specific prefix.
2021-07-04 17:29:50 +02:00
5009735032 Relax v4l2_sink lock constraints
To fix a data race, commit 5caeab5f6d
called video_buffer_push() and video_buffer_consume() under the
v4l2_sink lock.

Instead, use the previous_skipped indication (initialized with video
buffer locked) to lock only for protecting the has_frame flag.

This enables the possibility for the video_buffer to notify new frames
via callbacks without lock inversion issues.
2021-07-04 17:29:50 +02:00
f2da6662b5 Move include fps_counter
The fps_counter is not used from video_buffer.
2021-07-04 17:29:50 +02:00
59dc46d35c Remove obsolete comment
Commit 2a94a2b119 removed video_buffer
callbacks, the comment is now meaningless.
2021-07-04 17:29:50 +02:00
cf9889c5b7 Rename queue to sc_queue
Add a scrcpy-specific prefix.
2021-07-04 17:29:50 +02:00
0faadc7590 Replace delay by deadline in timedwait()
The function sc_cond_timedwait() accepted a parameter representing the
max duration to wait, because it internally uses SDL_CondWaitTimeout().

Instead, accept a deadline, to be consistent with
pthread_cond_timedwait().
2021-07-04 17:29:50 +02:00
1dcd3f295d Wrap tick API
This avoids to use the SDL timer API directly, and will allow to handle
generic ticks (possibly negative).
2021-07-04 17:29:14 +02:00
18 changed files with 502 additions and 163 deletions

View File

@ -10,6 +10,7 @@ src = [
'src/event_converter.c', 'src/event_converter.c',
'src/file_handler.c', 'src/file_handler.c',
'src/fps_counter.c', 'src/fps_counter.c',
'src/frame_buffer.c',
'src/input_manager.c', 'src/input_manager.c',
'src/opengl.c', 'src/opengl.c',
'src/receiver.c', 'src/receiver.c',

View File

@ -1,7 +1,6 @@
#include "fps_counter.h" #include "fps_counter.h"
#include <assert.h> #include <assert.h>
#include <SDL2/SDL_timer.h>
#include "util/log.h" #include "util/log.h"
@ -82,14 +81,12 @@ run_fps_counter(void *data) {
sc_cond_wait(&counter->state_cond, &counter->mutex); sc_cond_wait(&counter->state_cond, &counter->mutex);
} }
while (!counter->interrupted && is_started(counter)) { while (!counter->interrupted && is_started(counter)) {
uint32_t now = SDL_GetTicks(); sc_tick now = sc_tick_now();
check_interval_expired(counter, now); check_interval_expired(counter, now);
assert(counter->next_timestamp > now);
uint32_t remaining = counter->next_timestamp - now;
// ignore the reason (timeout or signaled), we just loop anyway // ignore the reason (timeout or signaled), we just loop anyway
sc_cond_timedwait(&counter->state_cond, &counter->mutex, remaining); sc_cond_timedwait(&counter->state_cond, &counter->mutex,
counter->next_timestamp);
} }
} }
sc_mutex_unlock(&counter->mutex); sc_mutex_unlock(&counter->mutex);
@ -99,7 +96,7 @@ run_fps_counter(void *data) {
bool bool
fps_counter_start(struct fps_counter *counter) { fps_counter_start(struct fps_counter *counter) {
sc_mutex_lock(&counter->mutex); sc_mutex_lock(&counter->mutex);
counter->next_timestamp = SDL_GetTicks() + FPS_COUNTER_INTERVAL_MS; counter->next_timestamp = sc_tick_now() + FPS_COUNTER_INTERVAL_MS;
counter->nr_rendered = 0; counter->nr_rendered = 0;
counter->nr_skipped = 0; counter->nr_skipped = 0;
sc_mutex_unlock(&counter->mutex); sc_mutex_unlock(&counter->mutex);
@ -165,7 +162,7 @@ fps_counter_add_rendered_frame(struct fps_counter *counter) {
} }
sc_mutex_lock(&counter->mutex); sc_mutex_lock(&counter->mutex);
uint32_t now = SDL_GetTicks(); sc_tick now = sc_tick_now();
check_interval_expired(counter, now); check_interval_expired(counter, now);
++counter->nr_rendered; ++counter->nr_rendered;
sc_mutex_unlock(&counter->mutex); sc_mutex_unlock(&counter->mutex);
@ -178,7 +175,7 @@ fps_counter_add_skipped_frame(struct fps_counter *counter) {
} }
sc_mutex_lock(&counter->mutex); sc_mutex_lock(&counter->mutex);
uint32_t now = SDL_GetTicks(); sc_tick now = sc_tick_now();
check_interval_expired(counter, now); check_interval_expired(counter, now);
++counter->nr_skipped; ++counter->nr_skipped;
sc_mutex_unlock(&counter->mutex); sc_mutex_unlock(&counter->mutex);

View File

@ -24,7 +24,7 @@ struct fps_counter {
bool interrupted; bool interrupted;
unsigned nr_rendered; unsigned nr_rendered;
unsigned nr_skipped; unsigned nr_skipped;
uint32_t next_timestamp; sc_tick next_timestamp;
}; };
bool bool

88
app/src/frame_buffer.c Normal file
View File

@ -0,0 +1,88 @@
#include "frame_buffer.h"
#include <assert.h>
#include <libavutil/avutil.h>
#include <libavformat/avformat.h>
#include "util/log.h"
bool
sc_frame_buffer_init(struct sc_frame_buffer *fb) {
fb->pending_frame = av_frame_alloc();
if (!fb->pending_frame) {
return false;
}
fb->tmp_frame = av_frame_alloc();
if (!fb->tmp_frame) {
av_frame_free(&fb->pending_frame);
return false;
}
bool ok = sc_mutex_init(&fb->mutex);
if (!ok) {
av_frame_free(&fb->pending_frame);
av_frame_free(&fb->tmp_frame);
return false;
}
// there is initially no frame, so consider it has already been consumed
fb->pending_frame_consumed = true;
return true;
}
void
sc_frame_buffer_destroy(struct sc_frame_buffer *fb) {
sc_mutex_destroy(&fb->mutex);
av_frame_free(&fb->pending_frame);
av_frame_free(&fb->tmp_frame);
}
static inline void
swap_frames(AVFrame **lhs, AVFrame **rhs) {
AVFrame *tmp = *lhs;
*lhs = *rhs;
*rhs = tmp;
}
bool
sc_frame_buffer_push(struct sc_frame_buffer *fb, const AVFrame *frame,
bool *previous_frame_skipped) {
sc_mutex_lock(&fb->mutex);
// Use a temporary frame to preserve pending_frame in case of error.
// tmp_frame is an empty frame, no need to call av_frame_unref() beforehand.
int r = av_frame_ref(fb->tmp_frame, frame);
if (r) {
LOGE("Could not ref frame: %d", r);
return false;
}
// Now that av_frame_ref() succeeded, we can replace the previous
// pending_frame
swap_frames(&fb->pending_frame, &fb->tmp_frame);
av_frame_unref(fb->tmp_frame);
if (previous_frame_skipped) {
*previous_frame_skipped = !fb->pending_frame_consumed;
}
fb->pending_frame_consumed = false;
sc_mutex_unlock(&fb->mutex);
return true;
}
void
sc_frame_buffer_consume(struct sc_frame_buffer *fb, AVFrame *dst) {
sc_mutex_lock(&fb->mutex);
assert(!fb->pending_frame_consumed);
fb->pending_frame_consumed = true;
av_frame_move_ref(dst, fb->pending_frame);
// av_frame_move_ref() resets its source frame, so no need to call
// av_frame_unref()
sc_mutex_unlock(&fb->mutex);
}

44
app/src/frame_buffer.h Normal file
View File

@ -0,0 +1,44 @@
#ifndef SC_FRAME_BUFFER_H
#define SC_FRAME_BUFFER_H
#include "common.h"
#include <stdbool.h>
#include "util/thread.h"
// forward declarations
typedef struct AVFrame AVFrame;
/**
* A frame buffer holds 1 pending frame, which is the last frame received from
* the producer (typically, the decoder).
*
* If a pending frame has not been consumed when the producer pushes a new
* frame, then it is lost. The intent is to always provide access to the very
* last frame to minimize latency.
*/
struct sc_frame_buffer {
AVFrame *pending_frame;
AVFrame *tmp_frame; // To preserve the pending frame on error
sc_mutex mutex;
bool pending_frame_consumed;
};
bool
sc_frame_buffer_init(struct sc_frame_buffer *fb);
void
sc_frame_buffer_destroy(struct sc_frame_buffer *fb);
bool
sc_frame_buffer_push(struct sc_frame_buffer *fb, const AVFrame *frame,
bool *skipped);
void
sc_frame_buffer_consume(struct sc_frame_buffer *fb, AVFrame *dst);
#endif

View File

@ -57,9 +57,9 @@ record_packet_delete(struct record_packet *rec) {
static void static void
recorder_queue_clear(struct recorder_queue *queue) { recorder_queue_clear(struct recorder_queue *queue) {
while (!queue_is_empty(queue)) { while (!sc_queue_is_empty(queue)) {
struct record_packet *rec; struct record_packet *rec;
queue_take(queue, next, &rec); sc_queue_take(queue, next, &rec);
record_packet_delete(rec); record_packet_delete(rec);
} }
} }
@ -135,14 +135,14 @@ run_recorder(void *data) {
for (;;) { for (;;) {
sc_mutex_lock(&recorder->mutex); sc_mutex_lock(&recorder->mutex);
while (!recorder->stopped && queue_is_empty(&recorder->queue)) { while (!recorder->stopped && sc_queue_is_empty(&recorder->queue)) {
sc_cond_wait(&recorder->queue_cond, &recorder->mutex); sc_cond_wait(&recorder->queue_cond, &recorder->mutex);
} }
// if stopped is set, continue to process the remaining events (to // if stopped is set, continue to process the remaining events (to
// finish the recording) before actually stopping // finish the recording) before actually stopping
if (recorder->stopped && queue_is_empty(&recorder->queue)) { if (recorder->stopped && sc_queue_is_empty(&recorder->queue)) {
sc_mutex_unlock(&recorder->mutex); sc_mutex_unlock(&recorder->mutex);
struct record_packet *last = recorder->previous; struct record_packet *last = recorder->previous;
if (last) { if (last) {
@ -161,7 +161,7 @@ run_recorder(void *data) {
} }
struct record_packet *rec; struct record_packet *rec;
queue_take(&recorder->queue, next, &rec); sc_queue_take(&recorder->queue, next, &rec);
sc_mutex_unlock(&recorder->mutex); sc_mutex_unlock(&recorder->mutex);
@ -235,7 +235,7 @@ recorder_open(struct recorder *recorder, const AVCodec *input_codec) {
goto error_mutex_destroy; goto error_mutex_destroy;
} }
queue_init(&recorder->queue); sc_queue_init(&recorder->queue);
recorder->stopped = false; recorder->stopped = false;
recorder->failed = false; recorder->failed = false;
recorder->header_written = false; recorder->header_written = false;
@ -340,7 +340,7 @@ recorder_push(struct recorder *recorder, const AVPacket *packet) {
return false; return false;
} }
queue_push(&recorder->queue, next, rec); sc_queue_push(&recorder->queue, next, rec);
sc_cond_signal(&recorder->queue_cond); sc_cond_signal(&recorder->queue_cond);
sc_mutex_unlock(&recorder->mutex); sc_mutex_unlock(&recorder->mutex);

View File

@ -17,7 +17,7 @@ struct record_packet {
struct record_packet *next; struct record_packet *next;
}; };
struct recorder_queue QUEUE(struct record_packet); struct recorder_queue SC_QUEUE(struct record_packet);
struct recorder { struct recorder {
struct sc_packet_sink packet_sink; // packet sink trait struct sc_packet_sink packet_sink; // packet sink trait

View File

@ -274,14 +274,16 @@ screen_frame_sink_close(struct sc_frame_sink *sink) {
static bool static bool
screen_frame_sink_push(struct sc_frame_sink *sink, const AVFrame *frame) { screen_frame_sink_push(struct sc_frame_sink *sink, const AVFrame *frame) {
struct screen *screen = DOWNCAST(sink); struct screen *screen = DOWNCAST(sink);
return sc_video_buffer_push(&screen->vb, frame);
}
bool previous_frame_skipped; static void
bool ok = video_buffer_push(&screen->vb, frame, &previous_frame_skipped); sc_video_buffer_on_new_frame(struct sc_video_buffer *vb, bool previous_skipped,
if (!ok) { void *userdata) {
return false; (void) vb;
} struct screen *screen = userdata;
if (previous_frame_skipped) { if (previous_skipped) {
fps_counter_add_skipped_frame(&screen->fps_counter); fps_counter_add_skipped_frame(&screen->fps_counter);
// The EVENT_NEW_FRAME triggered for the previous frame will consume // The EVENT_NEW_FRAME triggered for the previous frame will consume
// this new frame instead // this new frame instead
@ -293,8 +295,6 @@ screen_frame_sink_push(struct sc_frame_sink *sink, const AVFrame *frame) {
// Post the event on the UI thread // Post the event on the UI thread
SDL_PushEvent(&new_frame_event); SDL_PushEvent(&new_frame_event);
} }
return true;
} }
bool bool
@ -304,15 +304,25 @@ screen_init(struct screen *screen, const struct screen_params *params) {
screen->fullscreen = false; screen->fullscreen = false;
screen->maximized = false; screen->maximized = false;
bool ok = video_buffer_init(&screen->vb); static const struct sc_video_buffer_callbacks cbs = {
.on_new_frame = sc_video_buffer_on_new_frame,
};
bool ok = sc_video_buffer_init(&screen->vb, 1000, &cbs, screen);
if (!ok) { if (!ok) {
LOGE("Could not initialize video buffer"); LOGE("Could not initialize video buffer");
return false; return false;
} }
ok = sc_video_buffer_start(&screen->vb);
if (!ok) {
LOGE("Could not start video_buffer");
goto error_destroy_video_buffer;
}
if (!fps_counter_init(&screen->fps_counter)) { if (!fps_counter_init(&screen->fps_counter)) {
LOGE("Could not initialize FPS counter"); LOGE("Could not initialize FPS counter");
goto error_destroy_video_buffer; goto error_stop_and_join_video_buffer;
} }
screen->frame_size = params->frame_size; screen->frame_size = params->frame_size;
@ -453,8 +463,11 @@ error_destroy_window:
SDL_DestroyWindow(screen->window); SDL_DestroyWindow(screen->window);
error_destroy_fps_counter: error_destroy_fps_counter:
fps_counter_destroy(&screen->fps_counter); fps_counter_destroy(&screen->fps_counter);
error_stop_and_join_video_buffer:
sc_video_buffer_stop(&screen->vb);
sc_video_buffer_join(&screen->vb);
error_destroy_video_buffer: error_destroy_video_buffer:
video_buffer_destroy(&screen->vb); sc_video_buffer_destroy(&screen->vb);
return false; return false;
} }
@ -471,11 +484,13 @@ screen_hide_window(struct screen *screen) {
void void
screen_interrupt(struct screen *screen) { screen_interrupt(struct screen *screen) {
sc_video_buffer_stop(&screen->vb);
fps_counter_interrupt(&screen->fps_counter); fps_counter_interrupt(&screen->fps_counter);
} }
void void
screen_join(struct screen *screen) { screen_join(struct screen *screen) {
sc_video_buffer_join(&screen->vb);
fps_counter_join(&screen->fps_counter); fps_counter_join(&screen->fps_counter);
} }
@ -489,7 +504,7 @@ screen_destroy(struct screen *screen) {
SDL_DestroyRenderer(screen->renderer); SDL_DestroyRenderer(screen->renderer);
SDL_DestroyWindow(screen->window); SDL_DestroyWindow(screen->window);
fps_counter_destroy(&screen->fps_counter); fps_counter_destroy(&screen->fps_counter);
video_buffer_destroy(&screen->vb); sc_video_buffer_destroy(&screen->vb);
} }
static void static void
@ -595,7 +610,7 @@ update_texture(struct screen *screen, const AVFrame *frame) {
static bool static bool
screen_update_frame(struct screen *screen) { screen_update_frame(struct screen *screen) {
av_frame_unref(screen->frame); av_frame_unref(screen->frame);
video_buffer_consume(&screen->vb, screen->frame); sc_video_buffer_consume(&screen->vb, screen->frame);
AVFrame *frame = screen->frame; AVFrame *frame = screen->frame;
fps_counter_add_rendered_frame(&screen->fps_counter); fps_counter_add_rendered_frame(&screen->fps_counter);

View File

@ -8,6 +8,7 @@
#include <libavformat/avformat.h> #include <libavformat/avformat.h>
#include "coords.h" #include "coords.h"
#include "fps_counter.h"
#include "opengl.h" #include "opengl.h"
#include "trait/frame_sink.h" #include "trait/frame_sink.h"
#include "video_buffer.h" #include "video_buffer.h"
@ -19,7 +20,7 @@ struct screen {
bool open; // track the open/close state to assert correct behavior bool open; // track the open/close state to assert correct behavior
#endif #endif
struct video_buffer vb; struct sc_video_buffer vb;
struct fps_counter fps_counter; struct fps_counter fps_counter;
SDL_Window *window; SDL_Window *window;

View File

@ -557,7 +557,7 @@ server_stop(struct server *server) {
#define WATCHDOG_DELAY_MS 1000 #define WATCHDOG_DELAY_MS 1000
signaled = sc_cond_timedwait(&server->process_terminated_cond, signaled = sc_cond_timedwait(&server->process_terminated_cond,
&server->mutex, &server->mutex,
WATCHDOG_DELAY_MS); sc_tick_now() + WATCHDOG_DELAY_MS);
} }
sc_mutex_unlock(&server->mutex); sc_mutex_unlock(&server->mutex);

View File

@ -1,6 +1,6 @@
// generic intrusive FIFO queue // generic intrusive FIFO queue
#ifndef QUEUE_H #ifndef SC_QUEUE_H
#define QUEUE_H #define SC_QUEUE_H
#include "common.h" #include "common.h"
@ -10,15 +10,15 @@
// To define a queue type of "struct foo": // To define a queue type of "struct foo":
// struct queue_foo QUEUE(struct foo); // struct queue_foo QUEUE(struct foo);
#define QUEUE(TYPE) { \ #define SC_QUEUE(TYPE) { \
TYPE *first; \ TYPE *first; \
TYPE *last; \ TYPE *last; \
} }
#define queue_init(PQ) \ #define sc_queue_init(PQ) \
(void) ((PQ)->first = (PQ)->last = NULL) (void) ((PQ)->first = (PQ)->last = NULL)
#define queue_is_empty(PQ) \ #define sc_queue_is_empty(PQ) \
!(PQ)->first !(PQ)->first
// NEXTFIELD is the field in the ITEM type used for intrusive linked-list // NEXTFIELD is the field in the ITEM type used for intrusive linked-list
@ -30,30 +30,30 @@
// }; // };
// //
// // define the type "struct my_queue" // // define the type "struct my_queue"
// struct my_queue QUEUE(struct foo); // struct my_queue SC_QUEUE(struct foo);
// //
// struct my_queue queue; // struct my_queue queue;
// queue_init(&queue); // sc_queue_init(&queue);
// //
// struct foo v1 = { .value = 42 }; // struct foo v1 = { .value = 42 };
// struct foo v2 = { .value = 27 }; // struct foo v2 = { .value = 27 };
// //
// queue_push(&queue, next, v1); // sc_queue_push(&queue, next, v1);
// queue_push(&queue, next, v2); // sc_queue_push(&queue, next, v2);
// //
// struct foo *foo; // struct foo *foo;
// queue_take(&queue, next, &foo); // sc_queue_take(&queue, next, &foo);
// assert(foo->value == 42); // assert(foo->value == 42);
// queue_take(&queue, next, &foo); // sc_queue_take(&queue, next, &foo);
// assert(foo->value == 27); // assert(foo->value == 27);
// assert(queue_is_empty(&queue)); // assert(sc_queue_is_empty(&queue));
// //
// push a new item into the queue // push a new item into the queue
#define queue_push(PQ, NEXTFIELD, ITEM) \ #define sc_queue_push(PQ, NEXTFIELD, ITEM) \
(void) ({ \ (void) ({ \
(ITEM)->NEXTFIELD = NULL; \ (ITEM)->NEXTFIELD = NULL; \
if (queue_is_empty(PQ)) { \ if (sc_queue_is_empty(PQ)) { \
(PQ)->first = (PQ)->last = (ITEM); \ (PQ)->first = (PQ)->last = (ITEM); \
} else { \ } else { \
(PQ)->last->NEXTFIELD = (ITEM); \ (PQ)->last->NEXTFIELD = (ITEM); \
@ -65,9 +65,9 @@
// the result is stored in *(PITEM) // the result is stored in *(PITEM)
// (without typeof(), we could not store a local variable having the correct // (without typeof(), we could not store a local variable having the correct
// type so that we can "return" it) // type so that we can "return" it)
#define queue_take(PQ, NEXTFIELD, PITEM) \ #define sc_queue_take(PQ, NEXTFIELD, PITEM) \
(void) ({ \ (void) ({ \
assert(!queue_is_empty(PQ)); \ assert(!sc_queue_is_empty(PQ)); \
*(PITEM) = (PQ)->first; \ *(PITEM) = (PQ)->first; \
(PQ)->first = (PQ)->first->NEXTFIELD; \ (PQ)->first = (PQ)->first->NEXTFIELD; \
}) })

View File

@ -2,6 +2,7 @@
#include <assert.h> #include <assert.h>
#include <SDL2/SDL_thread.h> #include <SDL2/SDL_thread.h>
#include <SDL2/SDL_timer.h>
#include "log.h" #include "log.h"
@ -123,8 +124,14 @@ sc_cond_wait(sc_cond *cond, sc_mutex *mutex) {
} }
bool bool
sc_cond_timedwait(sc_cond *cond, sc_mutex *mutex, uint32_t ms) { sc_cond_timedwait(sc_cond *cond, sc_mutex *mutex, sc_tick deadline) {
int r = SDL_CondWaitTimeout(cond->cond, mutex->mutex, ms); sc_tick now = sc_tick_now();
if (deadline <= now) {
return false; // timeout
}
sc_tick delay = deadline - now;
int r = SDL_CondWaitTimeout(cond->cond, mutex->mutex, delay);
#ifndef NDEBUG #ifndef NDEBUG
if (r < 0) { if (r < 0) {
LOGC("Could not wait on condition with timeout: %s", SDL_GetError()); LOGC("Could not wait on condition with timeout: %s", SDL_GetError());
@ -163,3 +170,11 @@ sc_cond_broadcast(sc_cond *cond) {
(void) r; (void) r;
#endif #endif
} }
sc_tick
sc_tick_now(void) {
// SDL ticks is an unsigned 32 bits, but this is an implementation detail.
// It wraps if the program runs for more than ~49 days, but in practice we
// can assume it does not.
return (sc_tick) SDL_GetTicks();
}

View File

@ -16,6 +16,8 @@ typedef int sc_thread_fn(void *);
typedef unsigned sc_thread_id; typedef unsigned sc_thread_id;
typedef atomic_uint sc_atomic_thread_id; typedef atomic_uint sc_atomic_thread_id;
typedef int64_t sc_tick;
typedef struct sc_thread { typedef struct sc_thread {
SDL_Thread *thread; SDL_Thread *thread;
} sc_thread; } sc_thread;
@ -72,7 +74,7 @@ sc_cond_wait(sc_cond *cond, sc_mutex *mutex);
// return true on signaled, false on timeout // return true on signaled, false on timeout
bool bool
sc_cond_timedwait(sc_cond *cond, sc_mutex *mutex, uint32_t ms); sc_cond_timedwait(sc_cond *cond, sc_mutex *mutex, sc_tick deadline);
void void
sc_cond_signal(sc_cond *cond); sc_cond_signal(sc_cond *cond);
@ -80,4 +82,7 @@ sc_cond_signal(sc_cond *cond);
void void
sc_cond_broadcast(sc_cond *cond); sc_cond_broadcast(sc_cond *cond);
sc_tick
sc_tick_now(void);
#endif #endif

View File

@ -121,11 +121,11 @@ run_v4l2_sink(void *data) {
break; break;
} }
video_buffer_consume(&vs->vb, vs->frame);
vs->has_frame = false; vs->has_frame = false;
sc_mutex_unlock(&vs->mutex); sc_mutex_unlock(&vs->mutex);
sc_video_buffer_consume(&vs->vb, vs->frame);
bool ok = encode_and_write_frame(vs, vs->frame); bool ok = encode_and_write_frame(vs, vs->frame);
av_frame_unref(vs->frame); av_frame_unref(vs->frame);
if (!ok) { if (!ok) {
@ -139,17 +139,42 @@ run_v4l2_sink(void *data) {
return 0; return 0;
} }
static void
sc_video_buffer_on_new_frame(struct sc_video_buffer *vb, bool previous_skipped,
void *userdata) {
(void) vb;
struct sc_v4l2_sink *vs = userdata;
if (!previous_skipped) {
sc_mutex_lock(&vs->mutex);
vs->has_frame = true;
sc_cond_signal(&vs->cond);
sc_mutex_unlock(&vs->mutex);
}
}
static bool static bool
sc_v4l2_sink_open(struct sc_v4l2_sink *vs) { sc_v4l2_sink_open(struct sc_v4l2_sink *vs) {
bool ok = video_buffer_init(&vs->vb); static const struct sc_video_buffer_callbacks cbs = {
.on_new_frame = sc_video_buffer_on_new_frame,
};
bool ok = sc_video_buffer_init(&vs->vb, 1, &cbs, vs);
if (!ok) { if (!ok) {
LOGE("Could not initialize video buffer");
return false; return false;
} }
ok = sc_video_buffer_start(&vs->vb);
if (!ok) {
LOGE("Could not start video buffer");
goto error_video_buffer_destroy;
}
ok = sc_mutex_init(&vs->mutex); ok = sc_mutex_init(&vs->mutex);
if (!ok) { if (!ok) {
LOGC("Could not create mutex"); LOGC("Could not create mutex");
goto error_video_buffer_destroy; goto error_video_buffer_stop_and_join;
} }
ok = sc_cond_init(&vs->cond); ok = sc_cond_init(&vs->cond);
@ -274,8 +299,11 @@ error_cond_destroy:
sc_cond_destroy(&vs->cond); sc_cond_destroy(&vs->cond);
error_mutex_destroy: error_mutex_destroy:
sc_mutex_destroy(&vs->mutex); sc_mutex_destroy(&vs->mutex);
error_video_buffer_stop_and_join:
sc_video_buffer_stop(&vs->vb);
sc_video_buffer_join(&vs->vb);
error_video_buffer_destroy: error_video_buffer_destroy:
video_buffer_destroy(&vs->vb); sc_video_buffer_destroy(&vs->vb);
return false; return false;
} }
@ -287,7 +315,10 @@ sc_v4l2_sink_close(struct sc_v4l2_sink *vs) {
sc_cond_signal(&vs->cond); sc_cond_signal(&vs->cond);
sc_mutex_unlock(&vs->mutex); sc_mutex_unlock(&vs->mutex);
sc_video_buffer_stop(&vs->vb);
sc_thread_join(&vs->thread, NULL); sc_thread_join(&vs->thread, NULL);
sc_video_buffer_join(&vs->vb);
av_packet_free(&vs->packet); av_packet_free(&vs->packet);
av_frame_free(&vs->frame); av_frame_free(&vs->frame);
@ -297,24 +328,12 @@ sc_v4l2_sink_close(struct sc_v4l2_sink *vs) {
avformat_free_context(vs->format_ctx); avformat_free_context(vs->format_ctx);
sc_cond_destroy(&vs->cond); sc_cond_destroy(&vs->cond);
sc_mutex_destroy(&vs->mutex); sc_mutex_destroy(&vs->mutex);
video_buffer_destroy(&vs->vb); sc_video_buffer_destroy(&vs->vb);
} }
static bool static bool
sc_v4l2_sink_push(struct sc_v4l2_sink *vs, const AVFrame *frame) { sc_v4l2_sink_push(struct sc_v4l2_sink *vs, const AVFrame *frame) {
sc_mutex_lock(&vs->mutex); return sc_video_buffer_push(&vs->vb, frame);
bool ok = video_buffer_push(&vs->vb, frame, NULL);
if (!ok) {
return false;
}
vs->has_frame = true;
sc_cond_signal(&vs->cond);
sc_mutex_unlock(&vs->mutex);
return true;
} }
static bool static bool

View File

@ -12,7 +12,7 @@
struct sc_v4l2_sink { struct sc_v4l2_sink {
struct sc_frame_sink frame_sink; // frame sink trait struct sc_frame_sink frame_sink; // frame sink trait
struct video_buffer vb; struct sc_video_buffer vb;
AVFormatContext *format_ctx; AVFormatContext *format_ctx;
AVCodecContext *encoder_ctx; AVCodecContext *encoder_ctx;

View File

@ -6,83 +6,219 @@
#include "util/log.h" #include "util/log.h"
bool struct sc_clock {
video_buffer_init(struct video_buffer *vb) { double coeff;
vb->pending_frame = av_frame_alloc(); sc_tick offset;
if (!vb->pending_frame) { unsigned range;
return false;
struct {
sc_tick system;
sc_tick stream;
} last;
};
static void
sc_clock_init(struct sc_clock *clock) {
clock->coeff = 1;
clock->offset = 0;
clock->range = 0;
clock->last.system = 0;
clock->last.stream = 0;
}
static void
sc_clock_update(struct sc_clock *clock, sc_tick now, sc_tick stream_ts) {
sc_tick system_delta = now - clock->last.system;
sc_tick stream_delta = stream_ts - clock->last.stream;
double instant_coeff = (double) system_delta / stream_delta;
}
static sc_tick
sc_clock_get_system_ts(struct sc_clock *clock, sc_tick stream_ts) {
return (sc_tick) (stream_ts * clock->coeff) + clock->offset;
}
static struct sc_video_buffer_frame *
sc_video_buffer_frame_new(const AVFrame *frame) {
struct sc_video_buffer_frame *vb_frame = malloc(sizeof(*vb_frame));
if (!vb_frame) {
return NULL;
} }
vb->tmp_frame = av_frame_alloc(); vb_frame->frame = av_frame_alloc();
if (!vb->tmp_frame) { if (!vb_frame->frame) {
av_frame_free(&vb->pending_frame); free(vb_frame);
return false; return NULL;
} }
bool ok = sc_mutex_init(&vb->mutex); if (av_frame_ref(vb_frame->frame, frame)) {
av_frame_free(&vb_frame->frame);
free(vb_frame);
return NULL;
}
return vb_frame;
}
static void
sc_video_buffer_frame_delete(struct sc_video_buffer_frame *vb_frame) {
av_frame_unref(vb_frame->frame);
av_frame_free(&vb_frame->frame);
free(vb_frame);
}
static bool
sc_video_buffer_offer(struct sc_video_buffer *vb, const AVFrame *frame) {
bool previous_skipped;
bool ok = sc_frame_buffer_push(&vb->fb, frame, &previous_skipped);
if (!ok) { if (!ok) {
av_frame_free(&vb->pending_frame);
av_frame_free(&vb->tmp_frame);
return false; return false;
} }
// there is initially no frame, so consider it has already been consumed vb->cbs->on_new_frame(vb, previous_skipped, vb->cbs_userdata);
vb->pending_frame_consumed = true;
return true; return true;
} }
void static int
video_buffer_destroy(struct video_buffer *vb) { run_buffering(void *data) {
sc_mutex_destroy(&vb->mutex); struct sc_video_buffer *vb = data;
av_frame_free(&vb->pending_frame);
av_frame_free(&vb->tmp_frame);
}
static inline void assert(vb->buffering_ms);
swap_frames(AVFrame **lhs, AVFrame **rhs) {
AVFrame *tmp = *lhs; for (;;) {
*lhs = *rhs; sc_mutex_lock(&vb->b.mutex);
*rhs = tmp;
while (!vb->b.stopped && sc_queue_is_empty(&vb->b.queue)) {
sc_cond_wait(&vb->b.queue_cond, &vb->b.mutex);
}
if (vb->b.stopped) {
// Flush queue
while (!sc_queue_is_empty(&vb->b.queue)) {
struct sc_video_buffer_frame *vb_frame;
sc_queue_take(&vb->b.queue, next, &vb_frame);
sc_video_buffer_frame_delete(vb_frame);
}
break;
}
struct sc_video_buffer_frame *vb_frame;
sc_queue_take(&vb->b.queue, next, &vb_frame);
sc_mutex_unlock(&vb->b.mutex);
usleep(vb->buffering_ms * 1000);
sc_video_buffer_offer(vb, vb_frame->frame);
sc_video_buffer_frame_delete(vb_frame);
}
LOGD("Buffering thread ended");
return 0;
} }
bool bool
video_buffer_push(struct video_buffer *vb, const AVFrame *frame, sc_video_buffer_init(struct sc_video_buffer *vb, unsigned buffering_ms,
bool *previous_frame_skipped) { const struct sc_video_buffer_callbacks *cbs,
sc_mutex_lock(&vb->mutex); void *cbs_userdata) {
bool ok = sc_frame_buffer_init(&vb->fb);
// Use a temporary frame to preserve pending_frame in case of error. if (!ok) {
// tmp_frame is an empty frame, no need to call av_frame_unref() beforehand.
int r = av_frame_ref(vb->tmp_frame, frame);
if (r) {
LOGE("Could not ref frame: %d", r);
return false; return false;
} }
// Now that av_frame_ref() succeeded, we can replace the previous if (buffering_ms) {
// pending_frame ok = sc_mutex_init(&vb->b.mutex);
swap_frames(&vb->pending_frame, &vb->tmp_frame); if (!ok) {
av_frame_unref(vb->tmp_frame); LOGC("Could not create mutex");
sc_frame_buffer_destroy(&vb->fb);
return false;
}
if (previous_frame_skipped) { ok = sc_cond_init(&vb->b.queue_cond);
*previous_frame_skipped = !vb->pending_frame_consumed; if (!ok) {
LOGC("Could not create cond");
sc_mutex_destroy(&vb->b.mutex);
sc_frame_buffer_destroy(&vb->fb);
return false;
}
sc_queue_init(&vb->b.queue);
} }
vb->pending_frame_consumed = false;
sc_mutex_unlock(&vb->mutex); assert(cbs);
assert(cbs->on_new_frame);
vb->buffering_ms = buffering_ms;
vb->cbs = cbs;
vb->cbs_userdata = cbs_userdata;
return true;
}
bool
sc_video_buffer_start(struct sc_video_buffer *vb) {
if (vb->buffering_ms) {
bool ok =
sc_thread_create(&vb->b.thread, run_buffering, "buffering", vb);
if (!ok) {
LOGE("Could not start buffering thread");
return false;
}
}
return true; return true;
} }
void void
video_buffer_consume(struct video_buffer *vb, AVFrame *dst) { sc_video_buffer_stop(struct sc_video_buffer *vb) {
sc_mutex_lock(&vb->mutex); if (vb->buffering_ms) {
assert(!vb->pending_frame_consumed); sc_mutex_lock(&vb->b.mutex);
vb->pending_frame_consumed = true; vb->b.stopped = true;
sc_mutex_unlock(&vb->b.mutex);
av_frame_move_ref(dst, vb->pending_frame); }
// av_frame_move_ref() resets its source frame, so no need to call }
// av_frame_unref()
void
sc_mutex_unlock(&vb->mutex); sc_video_buffer_join(struct sc_video_buffer *vb) {
if (vb->buffering_ms) {
sc_thread_join(&vb->b.thread, NULL);
}
}
void
sc_video_buffer_destroy(struct sc_video_buffer *vb) {
sc_frame_buffer_destroy(&vb->fb);
if (vb->buffering_ms) {
sc_cond_destroy(&vb->b.queue_cond);
sc_mutex_destroy(&vb->b.mutex);
}
}
bool
sc_video_buffer_push(struct sc_video_buffer *vb, const AVFrame *frame) {
if (!vb->buffering_ms) {
// no buffering
return sc_video_buffer_offer(vb, frame);
}
struct sc_video_buffer_frame *vb_frame = sc_video_buffer_frame_new(frame);
if (!vb_frame) {
LOGE("Could not allocate frame");
return false;
}
sc_mutex_lock(&vb->b.mutex);
sc_queue_push(&vb->b.queue, next, vb_frame);
sc_cond_signal(&vb->b.queue_cond);
sc_mutex_unlock(&vb->b.mutex);
return true;
}
void
sc_video_buffer_consume(struct sc_video_buffer *vb, AVFrame *dst) {
sc_frame_buffer_consume(&vb->fb, dst);
} }

View File

@ -1,50 +1,68 @@
#ifndef VIDEO_BUFFER_H #ifndef SC_VIDEO_BUFFER_H
#define VIDEO_BUFFER_H #define SC_VIDEO_BUFFER_H
#include "common.h" #include "common.h"
#include <stdbool.h> #include <stdbool.h>
#include "fps_counter.h" #include "frame_buffer.h"
#include "util/queue.h"
#include "util/thread.h" #include "util/thread.h"
// forward declarations // forward declarations
typedef struct AVFrame AVFrame; typedef struct AVFrame AVFrame;
/** struct sc_video_buffer_frame {
* A video buffer holds 1 pending frame, which is the last frame received from AVFrame *frame;
* the producer (typically, the decoder). struct sc_video_buffer_frame *next;
* };
* If a pending frame has not been consumed when the producer pushes a new
* frame, then it is lost. The intent is to always provide access to the very
* last frame to minimize latency.
*
* The producer and the consumer typically do not live in the same thread.
* That's the reason why the callback on_frame_available() does not provide the
* frame as parameter: the consumer might post an event to its own thread to
* retrieve the pending frame from there, and that frame may have changed since
* the callback if producer pushed a new one in between.
*/
struct video_buffer { struct sc_video_buffer_frame_queue SC_QUEUE(struct sc_video_buffer_frame);
AVFrame *pending_frame;
AVFrame *tmp_frame; // To preserve the pending frame on error
sc_mutex mutex; struct sc_video_buffer {
struct sc_frame_buffer fb;
bool pending_frame_consumed; unsigned buffering_ms;
// only if buffering_ms > 0
struct {
sc_thread thread;
sc_mutex mutex;
sc_cond queue_cond;
struct sc_video_buffer_frame_queue queue;
bool stopped;
} b; // buffering
const struct sc_video_buffer_callbacks *cbs;
void *cbs_userdata;
};
struct sc_video_buffer_callbacks {
void (*on_new_frame)(struct sc_video_buffer *vb, bool previous_skipped,
void *userdata);
}; };
bool bool
video_buffer_init(struct video_buffer *vb); sc_video_buffer_init(struct sc_video_buffer *vb, unsigned buffering_ms,
const struct sc_video_buffer_callbacks *cbs,
void void *cbs_userdata);
video_buffer_destroy(struct video_buffer *vb);
bool bool
video_buffer_push(struct video_buffer *vb, const AVFrame *frame, bool *skipped); sc_video_buffer_start(struct sc_video_buffer *vb);
void void
video_buffer_consume(struct video_buffer *vb, AVFrame *dst); sc_video_buffer_stop(struct sc_video_buffer *vb);
void
sc_video_buffer_join(struct sc_video_buffer *vb);
void
sc_video_buffer_destroy(struct sc_video_buffer *vb);
bool
sc_video_buffer_push(struct sc_video_buffer *vb, const AVFrame *frame);
void
sc_video_buffer_consume(struct sc_video_buffer *vb, AVFrame *dst);
#endif #endif

View File

@ -10,28 +10,28 @@ struct foo {
}; };
static void test_queue(void) { static void test_queue(void) {
struct my_queue QUEUE(struct foo) queue; struct my_queue SC_QUEUE(struct foo) queue;
queue_init(&queue); sc_queue_init(&queue);
assert(queue_is_empty(&queue)); assert(sc_queue_is_empty(&queue));
struct foo v1 = { .value = 42 }; struct foo v1 = { .value = 42 };
struct foo v2 = { .value = 27 }; struct foo v2 = { .value = 27 };
queue_push(&queue, next, &v1); sc_queue_push(&queue, next, &v1);
queue_push(&queue, next, &v2); sc_queue_push(&queue, next, &v2);
struct foo *foo; struct foo *foo;
assert(!queue_is_empty(&queue)); assert(!sc_queue_is_empty(&queue));
queue_take(&queue, next, &foo); sc_queue_take(&queue, next, &foo);
assert(foo->value == 42); assert(foo->value == 42);
assert(!queue_is_empty(&queue)); assert(!sc_queue_is_empty(&queue));
queue_take(&queue, next, &foo); sc_queue_take(&queue, next, &foo);
assert(foo->value == 27); assert(foo->value == 27);
assert(queue_is_empty(&queue)); assert(sc_queue_is_empty(&queue));
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {