Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/moq-output.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ bool MoQOutput::Start()
auto result = moq_origin_publish(origin, path.data(), path.size(), broadcast);
if (result < 0) {
LOG_ERROR("Failed to publish broadcast to session: %d", result);
// The session connected above; close it so a retry on this same output
// doesn't reuse the stale handle. Its terminal callback releases the
// outstanding-session reference the destructor waits on.
Stop(false);
return false;
}

Expand Down
47 changes: 43 additions & 4 deletions src/moq-source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <util/dstr.h>

#include <atomic>
#include <string>
#include <errno.h>
#include <time.h>

Expand Down Expand Up @@ -227,19 +228,30 @@ static void moq_source_destroy(void *data)
// bug or an unaccounted handle): far better to log and proceed than to hang
// OBS on source deletion. In normal operation the terminals arrive within
// milliseconds and the timeout is never reached.
bool timed_out = false;
if (--ctx->refs > 0) {
struct timespec deadline;
timespec_get(&deadline, TIME_UTC);
deadline.tv_sec += 2;
while (ctx->refs > 0) {
if (pthread_cond_timedwait(&ctx->refs_zero, &ctx->mutex, &deadline) == ETIMEDOUT) {
LOG_WARNING("Teardown timed out with %d MoQ callback(s) still outstanding", ctx->refs);
LOG_WARNING("Teardown timed out with %d MoQ callback(s) still outstanding; "
"leaking ctx to avoid a use-after-free",
ctx->refs);
timed_out = true;
break;
}
}
}
pthread_mutex_unlock(&ctx->mutex);

// A subscription callback still holds ctx (it references ctx->mutex,
// ctx->refs, ctx->refs_zero). Freeing now would be a use-after-free when
// that callback fires, so intentionally leak instead. This only happens on
// the abnormal timeout path above.
if (timed_out)
return;

bfree(ctx->url);
bfree(ctx->broadcast);
// Note: frame_buffer is already freed by moq_source_disconnect_locked
Expand All @@ -250,6 +262,32 @@ static void moq_source_destroy(void *data)
bfree(ctx);
}

// Relay URLs can embed credentials (userinfo) or a query/path token, and OBS
// logs are frequently shared for support. Reduce a URL to scheme://host[:port]
// for logging so secrets never reach persistent logs.
static std::string redact_url(const char *url)
{
if (!url || !*url)
return "(null)";

std::string s(url);
size_t scheme = s.find("://");
std::string prefix = (scheme == std::string::npos) ? "" : s.substr(0, scheme + 3);
size_t rest = (scheme == std::string::npos) ? 0 : scheme + 3;

// The authority ends at the first '/', '?' or '#'.
size_t auth_end = s.find_first_of("/?#", rest);
std::string authority = s.substr(rest, auth_end == std::string::npos ? std::string::npos : auth_end - rest);

// Drop any userinfo (user:pass@). Use the last '@' so an unescaped '@' in a
// password can't leave part of it behind.
size_t at = authority.rfind('@');
if (at != std::string::npos)
authority = authority.substr(at + 1);

return prefix + authority;
}

static void moq_source_update(void *data, obs_data_t *settings)
{
struct moq_source *ctx = (struct moq_source *)data;
Expand Down Expand Up @@ -280,7 +318,7 @@ static void moq_source_update(void *data, obs_data_t *settings)

// If settings changed and are valid, reconnect
if (settings_changed && valid) {
LOG_INFO("Settings changed, reconnecting (url=%s, broadcast=%s)", url ? url : "(null)",
LOG_INFO("Settings changed, reconnecting (url=%s, broadcast=%s)", redact_url(url).c_str(),
broadcast ? broadcast : "(null)");
moq_source_reconnect(ctx);
} else if (settings_changed && !valid) {
Expand Down Expand Up @@ -1114,8 +1152,9 @@ static void moq_source_decode_frame(struct moq_source *ctx, int32_t frame_id)
sws_scale(ctx->sws_ctx, (const uint8_t *const *)frame->data, frame->linesize, 0, ctx->frame.height, dst_data,
dst_linesize);

// Update OBS frame timestamp and output
ctx->frame.timestamp = frame_data.timestamp_us;
// Update OBS frame timestamp and output. OBS expects nanoseconds; libmoq
// delivers microseconds.
ctx->frame.timestamp = frame_data.timestamp_us * 1000;
obs_source_output_video(ctx->source, &ctx->frame);

av_frame_free(&frame);
Expand Down
Loading