diff --git a/PLAN.prd b/PLAN.prd new file mode 100644 index 0000000..3d2cd39 --- /dev/null +++ b/PLAN.prd @@ -0,0 +1,482 @@ +# Multivariant Output Implementation Plan for Membrane.Transcoder + +## Goal + +Implement multivariant output capability for `Membrane.Transcoder`, allowing a single input stream to be transcoded into multiple output streams with independent configurations, while maintaining full backward compatibility with existing single-output usage. + +--- + +## Current State Analysis + +### Membrane.Transcoder (current) + +- **Type**: Bin (composes FFmpeg-based elements internally) +- **Input**: Single pad, accepts multiple stream formats +- **Output**: Single pad, with configurable `output_stream_format` +- **Architecture**: Builds internal pipeline in `handle_init` and `handle_child_notification` +- **Options**: Global `output_stream_format`, `transcoding_policy`, `native_acceleration` + +### Membrane.VKVideo.Transcoder (reference implementation) + +- **Type**: Filter (native Vulkan implementation) +- **Input**: Single pad +- **Output**: Multiple dynamic pads with `:on_request` availability +- **Mechanism**: + - `handle_pad_added/3` collects per-output options into `OutputSpec` structs + - `handle_stream_format/4` initializes native transcoder with all specs + - `handle_buffer/4` distributes to all outputs + - All output pads must be linked in the same spec as the transcoder element + +--- + +## Requirements (from user) + +1. **Architecture**: Extend existing Bin (not a new element) +2. **native_acceleration**: Per-output configuration +3. **output_stream_format**: Per-output configuration +4. **Backward compatibility**: Existing single-output code must work unchanged + +--- + +## Proposed Architecture + +### High-Level Design + +``` +Input Stream + | + v +[Stream Format Changer] (optional, if assumed_input_stream_format) + | + v +[Connector] (notifies on stream format) + | + +-- [Decoding Pipeline] --> [Transcoding Pipeline 0] --> [Funnel 0] --> =output_0 + | + +-- [Decoding Pipeline] --> [Transcoding Pipeline 1] --> [Funnel 1] --> =output_1 + | + +-- ... (one per output pad) + v +[Output Funnel] (for backward compat, if no via_out used) + | + v +=output (legacy single output) +``` + +### Key Design Decisions + +1. **Output Pad Definition**: Change from single static output to `:on_request` dynamic output pads +2. **State Management**: Store output specifications keyed by pad_ref in bin state +3. **Pipeline Building**: Create separate transcoding pipeline for each output pad +4. **Shared Decoding**: Video: share decoder across all outputs (decode once, encode multiple times). Audio: same approach. +5. **Backward Compatibility**: If no `via_out` is used, auto-create single default output pad + +--- + +## Output Pad Options Specification + +```elixir +@type output_pad_options :: %{ + output_stream_format: Membrane.Transcoder.stream_format() | Membrane.Transcoder.stream_format_module() | Membrane.Transcoder.stream_format_tuple() | Membrane.Transcoder.stream_format_resolver(), + width: non_neg_integer() | nil, + height: non_neg_integer() | nil, + native_acceleration: :never | :if_available, + transcoding_policy: :always | :if_needed | :never | (stream_format() -> :always | :if_needed | :never) +} +``` + +All options are optional. Defaults: + +- `output_stream_format`: Inherit from bin's global option (backward compat) +- `native_acceleration`: Inherit from bin's global option +- `transcoding_policy`: Inherit from bin's global option +- `width/height`: Nil (preserve input resolution) + +--- + +## Transcoding Options Map Specification + +The current functions accepting `use_hardware_acceleration?` boolean flag should be updated to accept a typed map of transcoding options. + +```elixir +@type transcoding_options :: %{ + native_acceleration: :never | :if_available, + preset: :ultrafast | :superfast | :veryfast | :faster | :fast | :medium | :slow | :slower | :veryslow | nil +} +``` + +### Option Descriptions + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `native_acceleration` | `:never` \| `:if_available` | `:never` | Whether to use Vulkan hardware acceleration. | +| `preset` | atom \| nil | `:ultrafast` | Encoding preset for FFmpeg encoders. | + +### Source of Options + +- `native_acceleration`: Existing bin option, converted to `use_hardware_acceleration?` boolean internally. +- `preset`: Existing option in `H264.FFmpeg.Encoder` and `H265.FFmpeg.Encoder`, currently hardcoded to `:ultrafast`. + +### Functions to Update + +| File | Function | +|------|----------| +| `lib/transcoder/video.ex` | `plug_video_transcoding/5` | +| `lib/transcoder/audio.ex` | `plug_audio_transcoding/5` | +| `lib/transcoder.ex` | `plug_transcoding/5` | + +> **Note**: `width` and `height` are already covered in `output_pad_options`. + +--- + +## API Examples + +### Example 1: Single Output (Backward Compatible - Existing) + +```elixir +child(:transcoder, %Membrane.Transcoder{ + output_stream_format: H264, + native_acceleration: :if_available +}) +``` + +### Example 2: Single Output with via_out + +```elixir +child(:transcoder, Membrane.Transcoder), +get_child(:transcoder) +|> via_out(Pad.ref(:output, 0), options: [ + output_stream_format: H264, + native_acceleration: :if_available +]) +|> child(:sink, %File.Sink{location: "output.h264"}) +``` + +### Example 3: Multiple Outputs (New) + +```elixir +child(:transcoder, Membrane.Transcoder), +get_child(:transcoder) +|> via_out(Pad.ref(:output, 0), options: [ + output_stream_format: H264, + width: 1280, + height: 720, + native_acceleration: :if_available +]) +|> child(:sink_hd, %File.Sink{location: "hd.h264"}), + +get_child(:transcoder) +|> via_out(Pad.ref(:output, 1), options: [ + output_stream_format: H264, + width: 640, + height: 360 +]) +|> child(:sink_sd, %File.Sink{location: "sd.h264"}), + +get_child(:transcoder) +|> via_out(Pad.ref(:output, 2), options: [ + output_stream_format: Membrane.H265, + width: 1920, + height: 1080, + native_acceleration: :never +]) +|> child(:sink_4k, %File.Sink{location: "4k.h265"}) +``` + +### Example 4: Mixed Video and Audio (Conceptual) + +```elixir +# Note: Each output pad can have different stream formats +get_child(:transcoder) +|> via_out(Pad.ref(:output, 0), options: [ + output_stream_format: H264 +]) +|> child(:video_sink, %File.Sink{location: "video.h264"}), + +get_child(:transcoder) +|> via_out(Pad.ref(:output, 1), options: [ + output_stream_format: Opus +]) +|> child(:audio_sink, %File.Sink{location: "audio.opus"}) +``` + +--- + +## Implementation Tasks + +### Phase 1: Core Bin Modifications (`lib/transcoder.ex`) + +#### Task 1.1: Update Output Pad Definition + +- [ ] Change `def_output_pad :output` to use `availability: :on_request` +- [ ] Add `options` parameter to output pad with: + - `output_stream_format` (per-output) + - `width` (per-output) + - `height` (per-output) + - `native_acceleration` (per-output) + - `transcoding_policy` (per-output) + +#### Task 1.2: Add State for Output Specifications + +- [ ] Add to state in `handle_init/2`: + + ```elixir + %{ + output_specs: %{}, # Pad.ref() -> output_pad_options + input_stream_format: nil, + use_hardware_acceleration?: boolean, + # ... existing state + } + ``` + +#### Task 1.3: Implement handle_pad_added/3 + +```elixir +@impl true +def handle_pad_added(pad_ref, %{playback: :playing} = _ctx, _state) do + raise "Output pad #{inspect(pad_ref)} was linked while already playing. " <> + "All output pads must be linked in the same spec as the transcoder." +end + +@impl true +def handle_pad_added(pad_ref, ctx, state) do + pad_opts = ctx.pads[pad_ref].options + + # Validate and store output spec + output_spec = validate_output_spec(pad_opts, state) + + {[], %{state | output_specs: Map.put(state.output_specs, pad_ref, output_spec)}} +end +``` + +#### Task 1.4: Implement handle_pad_removed/3 + +```elixir +@impl true +def handle_pad_removed(pad_ref, _ctx, state) do + {[], %{state | output_specs: Map.delete(state.output_specs, pad_ref)}} +end +``` + +#### Task 1.5: Modify handle_init/2 for Dynamic Output + +- [ ] Remove `bin_output()` from initial spec (no static output) +- [ ] Initialize `output_specs: %{}` in state +- [ ] Keep `output_funnel` for backward compatibility path + +#### Task 1.6: Modify handle_child_notification for Stream Format + +Current: Builds single pipeline when input stream format arrives +New: + +- If `output_specs` is empty (no via_out used), create default single output +- For each output in `output_specs`, build a transcoding pipeline +- Connect each pipeline to its corresponding output pad via funnel + +Pseudocode: + +```elixir +def handle_child_notification({:stream_format, _pad, format}, :connector, _ctx, state) do + state = %{state | input_stream_format: format} + + # If no output_specs (backward compat), create default + output_specs = if Map.size(state.output_specs) == 0 do + %{Pad.ref(:output, 0) => state.opts} # Use bin's global options + else + state.output_specs + end + + # Build pipelines for each output + spec = build_multivariant_spec(state, output_specs) + + {[spec: spec], state} +end + +defp build_multivariant_spec(state, output_specs) do + base_builder = + bin_input() + |> maybe_plug_stream_format_changer(state.assumed_input_stream_format) + |> child(:connector, %Membrane.Connector{notify_on_stream_format?: true}) + |> child(:decoder, get_decoder_for(state.input_stream_format)) + + # For each output, create: converter -> encoder -> parser -> funnel -> output + Enum.reduce(output_specs, base_builder, fn {pad_ref, opts}, builder -> + funnel_name = :"funnel_#{pad_ref}" + + builder + |> plug_output_pipeline(pad_ref, opts, funnel_name) + |> child(funnel_name, Funnel) + |> bin_output(pad_ref) + end) +end +``` + +#### Task 1.7: Update Options Documentation + +- [ ] Document new per-output options +- [ ] Document that options can be inherited from bin-level + +### Phase 2: Video Transcoding Pipeline (`lib/transcoder/video.ex`) + +#### Task 2.1: Create Per-Output Video Pipeline Builder + +```elixir +defp plug_video_output_pipeline(builder, input_format, output_opts, funnel_name) do + output_format = resolve_output_format(output_opts.output_stream_format, input_format) + use_hw = resolve_native_acceleration(output_opts.native_acceleration, global_hw_setting) + policy = resolve_transcoding_policy(output_opts.transcoding_policy, global_policy) + + # Build: swscale (if needed) -> encoder -> parser + builder + |> maybe_plug_swscale_converter(input_format, output_format, output_opts.width, output_opts.height) + |> maybe_plug_encoder_and_parser(output_format, use_hw) + |> get_child(funnel_name) +end +``` + +#### Task 2.2: Handle Width/Height Scaling + +- [ ] Add scaling support when width/height differ from input +- [ ] Use SWScale.Converter for software scaling +- [ ] Consider: VKVideo scaling if native_acceleration is enabled + +### Phase 3: Audio Transcoding Pipeline (`lib/transcoder/audio.ex`) + +#### Task 3.1: Create Per-Output Audio Pipeline Builder + +Similar to video, but for audio: + +- Resample if needed (for sample rate, channels, or format changes) +- Encode to target format +- Parse output + +### Phase 4: Backward Compatibility + +#### Task 4.1: Auto-Create Default Output + +If no `via_out` is linked by the time stream format arrives: + +- Create a single output with pad_ref `Pad.ref(:output, 0)` +- Use bin's global options for this output +- This maintains existing behavior + +#### Task 4.2: Validate No Mixed Usage + +- [ ] Detect if user tries to use both `output_stream_format` option AND `via_out` +- [ ] Raise clear error message + +### Phase 5: Testing + +#### Task 5.1: Unit Tests + +- [ ] Test `handle_pad_added` with valid options +- [ ] Test `handle_pad_added` raises when already playing +- [ ] Test option inheritance from bin to output +- [ ] Test validation of output specs + +#### Task 5.2: Integration Tests + +- [ ] Test single output via bin options +- [ ] Test single output via `via_out` +- [ ] Test multiple outputs with different formats +- [ ] Test multiple outputs with different resolutions +- [ ] Test mixed video/audio outputs +- [ ] Test multiple outputs with different native_acceleration settings (requires `:vulkan` tag, skip during this feature) + +#### Task 5.3: Example Files + +- [ ] Create `examples/multivariant_h264.exs` demonstrating multiple outputs +- [ ] Update existing examples to show both old and new API + +--- + +## File Changes Summary + +| File | Changes | +|------|---------| +| `lib/transcoder.ex` | Add `:on_request` output pad with options, implement `handle_pad_added`, `handle_pad_removed`, modify pipeline building logic, add state for output_specs | +| `lib/transcoder/video.ex` | Add `plug_video_output_pipeline/4` function, support per-output width/height | +| `lib/transcoder/audio.ex` | Add `plug_audio_output_pipeline/4` function for per-output audio | +| `test/transcoder_test.exs` | Add tests for multivariant output | +| `examples/multivariant_h264.exs` | New example file (create) | + +--- + +## Challenges and Risks + +### Challenge 1: Bin vs Filter Semantics + +- **Issue**: Filters have direct access to `ctx.pads` with options. Bins also have this, but the pipeline building happens differently. +- **Solution**: Use `handle_pad_added` in Bin to collect options, then reference them when building internal spec. + +### Challenge 2: Pipeline Building Timing + +- **Issue**: Internal spec must be built before playback starts. Output pads can be added after spec is initially created. +- **Solution**: Build the full spec with all output pipelines in `handle_child_notification` when stream format arrives. This is when we know the input format and all outputs must already be linked (enforced by `handle_pad_added`). + +### Challenge 3: Shared Decoding + +- **Issue**: For efficiency, we should decode once and feed to multiple encoders. But Bins build linear pipelines. +- **Solution**: Use a `Funnel` or `Tee` element to split the decoded stream to multiple encoding pipelines. Or: create the decoding pipeline once, then branch to multiple encoding pipelines. + + ``` + input -> decoder -> tee + -> encoder_0 -> parser_0 -> funnel_0 -> output_0 + -> encoder_1 -> parser_1 -> funnel_1 -> output_1 + ``` + +### Challenge 4: Output Pad Naming + +- **Issue**: Need to ensure each `via_out` creates a unique pad_ref. +- **Solution**: Use `Pad.ref(:output, index)` pattern. Users provide index, or we auto-increment. + +### Challenge 5: Error Handling + +- **Issue**: If one output pipeline fails, what happens to others? +- **Solution**: Isolate each output pipeline. If one fails, terminate it but keep others running. Or: fail all (simpler, but less robust). +- **Decision**: For v1, fail all on any output failure (simpler). Future enhancement: isolate. + +### Challenge 6: Stream Format Notification + +- **Issue**: Each output may have different stream format. Need to notify parent pipeline. +- **Solution**: Each output funnel should emit its stream format to the corresponding output pad. + +--- + +## Validation Rules + +1. **All outputs must be linked before playback starts** (enforced in `handle_pad_added`) +2. **Output pad options must be valid** (validate in `handle_pad_added`) +3. **Cannot mix old and new API** (detect in `handle_child_notification`, raise error) +4. **Each output must have a valid output_stream_format** (inherit or explicit) + +--- + +## Success Criteria + +- [ ] Existing single-output code works without any changes +- [ ] Multiple outputs can be configured with `via_out` +- [ ] Each output can have independent: format, resolution, acceleration +- [ ] All outputs are produced simultaneously from a single input +- [ ] Tests pass for both old and new API +- [ ] Example files demonstrate the feature + +--- + +## Estimation (Rough) + +- Phase 1: 2-3 days (core bin changes) +- Phase 2: 1 day (video pipeline) +- Phase 3: 1 day (audio pipeline) +- Phase 4: 1 day (backward compat edge cases) +- Phase 5: 1-2 days (testing) +- **Total**: 6-8 days + +--- + +## Notes + +- The implementation follows the pattern established by `Membrane.VKVideo.Transcoder` +- Key difference: VKVideo is a Filter with native code, we're extending a Bin with FFmpeg elements +- Shared decoding is important for performance - decode once, encode multiple times +- Consider using `Membrane.Tee` for splitting decoded stream to multiple encoders diff --git a/examples/multivariant_output.exs b/examples/multivariant_output.exs new file mode 100644 index 0000000..7a3f1ea --- /dev/null +++ b/examples/multivariant_output.exs @@ -0,0 +1,91 @@ +Mix.install([ + :membrane_file_plugin, + {:membrane_transcoder_plugin, path: __DIR__ |> Path.join("..") |> Path.expand()} +]) + +defmodule Example do + alias Membrane.{H264, H265, VP8, RCPipeline} + require RCPipeline + require Membrane.Pad + + import Membrane.ChildrenSpec + + @doc """ + Transcodes a single H264 input into three output files simultaneously: + - output 0: H264 (annexb, repackaged — no re-encode) + - output 1: H265 (transcoded) + - output 2: VP8 (transcoded) + + Each output pad carries its own `output_stream_format`, `transcoding_policy`, and + `native_acceleration` options, all resolved independently inside the transcoder bin. + """ + def run(input_file, h264_output_file, h265_output_file, vp8_output_file) do + pipeline = RCPipeline.start_link!() + + spec = [ + child(%Membrane.File.Source{location: input_file}) + |> child(:parser, %H264.Parser{ + output_stream_structure: :annexb, + output_alignment: :au, + generate_best_effort_timestamps: %{framerate: {30, 1}} + }) + |> child(:transcoder, Membrane.Transcoder), + + # Output 0 — keep H264, just repackage (no re-encode) + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 0), + options: [ + output_stream_format: %H264{alignment: :au, stream_structure: :annexb}, + transcoding_policy: :if_needed + ] + ) + |> child(:h264_sink, %Membrane.File.Sink{location: h264_output_file}), + + # Output 1 — transcode to H265 + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 1), + options: [ + output_stream_format: H265, + transcoding_policy: :always + ] + ) + |> child(:h265_sink, %Membrane.File.Sink{location: h265_output_file}), + + # Output 2 — transcode to VP8 + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 2), + options: [ + output_stream_format: VP8, + transcoding_policy: :always + ] + ) + |> child(:vp8_sink, %Membrane.File.Sink{location: vp8_output_file}) + ] + + RCPipeline.subscribe(pipeline, _any) + RCPipeline.exec_actions(pipeline, spec: spec) + RCPipeline.await_end_of_stream(pipeline, :h264_sink) + RCPipeline.await_end_of_stream(pipeline, :h265_sink) + RCPipeline.await_end_of_stream(pipeline, :vp8_sink) + RCPipeline.terminate(pipeline) + end +end + +File.mkdir_p!(Path.join(__DIR__, "tmp")) + +input = Path.join(__DIR__, "../test/fixtures/video.h264") +h264_out = Path.join(__DIR__, "tmp/multivariant_output.h264") +h265_out = Path.join(__DIR__, "tmp/multivariant_output.h265") +vp8_out = Path.join(__DIR__, "tmp/multivariant_output.ivf") + +IO.puts("Input: #{input}") +IO.puts("H264 output: #{h264_out}") +IO.puts("H265 output: #{h265_out}") +IO.puts("VP8 output: #{vp8_out}") +IO.puts("") +Example.run(input, h264_out, h265_out, vp8_out) + +IO.puts("Done.") +IO.puts(" #{h264_out} (#{File.stat!(h264_out).size} bytes)") +IO.puts(" #{h265_out} (#{File.stat!(h265_out).size} bytes)") +IO.puts(" #{vp8_out} (#{File.stat!(vp8_out).size} bytes)") diff --git a/lib/transcoder.ex b/lib/transcoder.ex index 411d8d5..9a1eb0a 100644 --- a/lib/transcoder.ex +++ b/lib/transcoder.ex @@ -29,15 +29,26 @@ defmodule Membrane.Transcoder do When the `membrane_vk_video_plugin` dependency is present and Vulkan hardware is available, H.264 encode/decode can be offloaded to the GPU by setting `native_acceleration: :if_available`. + + ## Usage + + child(:transcoder, Membrane.Transcoder), + get_child(:transcoder) + |> via_out(Pad.ref(:output, 0), options: [output_stream_format: H264, width: 1280, height: 720]) + |> child(:hd_sink, Membrane.File.Sink), + get_child(:transcoder) + |> via_out(Pad.ref(:output, 1), options: [output_stream_format: H264, width: 640, height: 360]) + |> child(:sd_sink, Membrane.File.Sink) """ use Membrane.Bin require __MODULE__.Audio require __MODULE__.Video require Membrane.Logger + require Membrane.Pad alias __MODULE__.{Audio, Video} - alias Membrane.{AAC, Funnel, H264, H265, Opus, RawAudio, RawVideo, RemoteStream, VP8, VP9} + alias Membrane.{AAC, Funnel, H264, H265, Opus, Pad, RawAudio, RawVideo, RemoteStream, VP8, VP9} @typedoc """ Describes stream formats acceptable on the bin's input and output. @@ -82,22 +93,60 @@ defmodule Membrane.Transcoder do format.__struct__ == RemoteStream def_output_pad :output, - accepted_format: format when Audio.is_audio_format(format) or Video.is_video_format(format) + availability: :on_request, + accepted_format: format when Audio.is_audio_format(format) or Video.is_video_format(format), + options: [ + output_stream_format: [ + spec: + stream_format() + | stream_format_module() + | stream_format_tuple() + | stream_format_resolver() + | nil, + default: nil, + description: """ + Per-output stream format. Inherits from bin's `output_stream_format` option if nil. + """ + ], + transcoding_policy: [ + spec: + :always + | :if_needed + | :never + | (stream_format() -> :always | :if_needed | :never) + | nil, + default: nil, + description: """ + Per-output transcoding policy. Inherits from bin's `transcoding_policy` option if nil. + """ + ], + native_acceleration: [ + spec: :never | :if_available | nil, + default: nil, + description: """ + Per-output native acceleration setting. Inherits from bin's `native_acceleration` option if nil. + """ + ] + ] def_options output_stream_format: [ spec: stream_format() | stream_format_module() | stream_format_tuple() - | stream_format_resolver(), + | stream_format_resolver() + | nil, + default: nil, description: """ - An option specifying desired output format. + An option specifying the desired output format for all outputs. Can be either: * a struct being a Membrane stream format, * a module in which Membrane stream format struct is defined, * a function which receives input stream format as an input argument and is supposed to return the desired output stream format or its module. + + When using per-output `via_out` options, individual outputs can override this value. """ ], transcoding_policy: [ @@ -157,20 +206,17 @@ defmodule Membrane.Transcoder do @impl true def handle_init(_ctx, opts) do - spec = [ + spec = bin_input() |> maybe_plug_stream_format_changer(opts.assumed_input_stream_format) - |> child(:connector, %Membrane.Connector{notify_on_stream_format?: true}), - child(:output_funnel, Funnel) - |> bin_output() - ] + |> child(:connector, %Membrane.Connector{notify_on_stream_format?: true}) state = opts |> Map.from_struct() |> Map.merge(%{ input_stream_format: nil, - use_hardware_acceleration?: should_use_hardware_acceleration?(opts.native_acceleration) + output_specs: %{} }) {[spec: spec], state} @@ -203,29 +249,90 @@ defmodule Membrane.Transcoder do }) end + @impl true + def handle_pad_added(Pad.ref(:output, pad_id) = pad_ref, ctx, state) do + pad_opts = ctx.pads[pad_ref].options + + suffix = pad_id_to_suffix(pad_id) + funnel_name = :"funnel_#{suffix}" + + output_spec = %{ + output_stream_format: pad_opts.output_stream_format || state.output_stream_format, + transcoding_policy: pad_opts.transcoding_policy || state.transcoding_policy, + native_acceleration: pad_opts.native_acceleration || state.native_acceleration, + funnel_name: funnel_name, + suffix: suffix, + pad_id: pad_id + } + + spec = child(funnel_name, Funnel) |> bin_output(pad_ref) + + {[spec: spec], %{state | output_specs: Map.put(state.output_specs, pad_ref, output_spec)}} + end + + @impl true + def handle_pad_removed(Pad.ref(:output, _id) = pad_ref, _ctx, state) do + {[], %{state | output_specs: Map.delete(state.output_specs, pad_ref)}} + end + @impl true def handle_child_notification({:stream_format, _pad, format}, :connector, _ctx, state) when state.input_stream_format == nil do - state = - %{state | input_stream_format: format} - |> resolve_output_stream_format() - - state = - with %{transcoding_policy: f} when is_function(f) <- state do - %{state | transcoding_policy: f.(format)} + state = %{state | input_stream_format: format} + + output_specs_list = Map.to_list(state.output_specs) + single_output? = length(output_specs_list) == 1 + + specs = + if single_output? do + [{_pad_ref, output_spec}] = output_specs_list + use_hw? = should_use_hardware_acceleration?(output_spec.native_acceleration) + resolved_format = resolve_output_stream_format(output_spec.output_stream_format, format) + + transcoding_policy = resolve_transcoding_policy(output_spec.transcoding_policy, format) + + [ + get_child(:connector) + |> plug_transcoding( + format, + resolved_format, + transcoding_policy, + use_hw?, + output_spec.suffix + ) + |> get_child(output_spec.funnel_name) + ] + else + # Build tee and all output pipelines in a single spec so the tee + # is never in a state where data flows through it without outputs connected. + tee_spec = get_child(:connector) |> child(:tee, Membrane.Tee.Parallel) + + output_pipeline_specs = + Enum.map(output_specs_list, fn {_pad_ref, output_spec} -> + use_hw? = should_use_hardware_acceleration?(output_spec.native_acceleration) + + resolved_format = + resolve_output_stream_format(output_spec.output_stream_format, format) + + transcoding_policy = + resolve_transcoding_policy(output_spec.transcoding_policy, format) + + get_child(:tee) + |> via_out(Pad.ref(:output, output_spec.pad_id)) + |> plug_transcoding( + format, + resolved_format, + transcoding_policy, + use_hw?, + output_spec.suffix + ) + |> get_child(output_spec.funnel_name) + end) + + [tee_spec | output_pipeline_specs] end - spec = - get_child(:connector) - |> plug_transcoding( - format, - state.output_stream_format, - state.transcoding_policy, - state.use_hardware_acceleration? - ) - |> get_child(:output_funnel) - - {[spec: spec], state} + {[spec: specs], state} end @impl true @@ -249,20 +356,24 @@ defmodule Membrane.Transcoder do {[], state} end - defp resolve_output_stream_format(state) do - case state.output_stream_format do + defp resolve_transcoding_policy(f, format) when is_function(f), do: f.(format) + defp resolve_transcoding_policy(policy, _format), do: policy + + defp resolve_output_stream_format(nil, input_format), do: input_format + + defp resolve_output_stream_format(output_stream_format, input_format) do + case output_stream_format do format when is_struct(format) -> - state + format module when is_atom(module) -> - %{state | output_stream_format: struct(module)} + struct(module) {module, opts} when is_atom(module) and is_list(opts) -> - %{state | output_stream_format: struct(module, opts)} + struct(module, opts) resolver when is_function(resolver) -> - %{state | output_stream_format: resolver.(state.input_stream_format)} - |> resolve_output_stream_format() + resolve_output_stream_format(resolver.(input_format), input_format) end end @@ -271,11 +382,12 @@ defmodule Membrane.Transcoder do input_format, output_format, transcoding_policy, - _use_hardware_acceleration? + _use_hardware_acceleration?, + suffix ) when Audio.is_audio_format(input_format) do builder - |> Audio.plug_audio_transcoding(input_format, output_format, transcoding_policy) + |> Audio.plug_audio_transcoding(input_format, output_format, transcoding_policy, suffix) end defp plug_transcoding( @@ -283,7 +395,8 @@ defmodule Membrane.Transcoder do input_format, output_format, transcoding_policy, - use_hardware_acceleration? + use_hardware_acceleration?, + suffix ) when Video.is_video_format(input_format) do builder @@ -291,7 +404,12 @@ defmodule Membrane.Transcoder do input_format, output_format, transcoding_policy, - use_hardware_acceleration? + use_hardware_acceleration?, + suffix ) end + + defp pad_id_to_suffix(id) when is_integer(id), do: "output_#{id}" + defp pad_id_to_suffix(id) when is_atom(id), do: "output_#{id}" + defp pad_id_to_suffix(id), do: "output_#{:erlang.phash2(id)}" end diff --git a/lib/transcoder/audio.ex b/lib/transcoder/audio.ex index 8833f6f..e33993d 100644 --- a/lib/transcoder/audio.ex +++ b/lib/transcoder/audio.ex @@ -64,18 +64,26 @@ defmodule Membrane.Transcoder.Audio do ChildrenSpec.builder(), audio_stream_format() | RemoteStream.t(), audio_stream_format(), - boolean() + :always | :if_needed | :never, + String.t() | nil ) :: ChildrenSpec.builder() - def plug_audio_transcoding(builder, input_format, output_format, transcoding_policy) + def plug_audio_transcoding( + builder, + input_format, + output_format, + transcoding_policy, + suffix \\ nil + ) when is_audio_format(input_format) and is_audio_format(output_format) do - do_plug_audio_transcoding(builder, input_format, output_format, transcoding_policy) + do_plug_audio_transcoding(builder, input_format, output_format, transcoding_policy, suffix) end defp do_plug_audio_transcoding( builder, %format_module{}, %format_module{}, - transcoding_policy + transcoding_policy, + _suffix ) when transcoding_policy in [:if_needed, :never] do Membrane.Logger.debug(""" @@ -89,63 +97,70 @@ defmodule Membrane.Transcoder.Audio do builder, %RemoteStream{content_format: Opus}, %Opus{}, - transcoding_policy + transcoding_policy, + suffix ) when transcoding_policy in [:if_needed, :never] do - builder |> child(:opus_parser, Opus.Parser) + builder |> child(child_name(suffix, :opus_parser), Opus.Parser) end - defp do_plug_audio_transcoding(_builder, input_format, output_format, :never) do + defp do_plug_audio_transcoding(_builder, input_format, output_format, :never, _suffix) do raise """ Cannot convert input format #{inspect(input_format)} to output format #{inspect(output_format)} \ with :transcoding_policy option set to :never. """ end - defp do_plug_audio_transcoding(builder, input_format, output_format, _transcoding_policy) do + defp do_plug_audio_transcoding( + builder, + input_format, + output_format, + _transcoding_policy, + suffix + ) do builder - |> maybe_plug_parser(input_format) - |> maybe_plug_decoder(input_format) - |> maybe_plug_resampler(input_format, output_format) - |> maybe_plug_encoder(output_format) + |> maybe_plug_parser(input_format, suffix) + |> maybe_plug_decoder(input_format, suffix) + |> maybe_plug_resampler(input_format, output_format, suffix) + |> maybe_plug_encoder(output_format, suffix) end - defp maybe_plug_parser(builder, %AAC{}) do - builder |> child(:aac_parser, AAC.Parser) + defp maybe_plug_parser(builder, %AAC{}, suffix) do + builder |> child(child_name(suffix, :aac_parser), AAC.Parser) end - defp maybe_plug_parser(builder, _input_format) do + defp maybe_plug_parser(builder, _input_format, _suffix) do builder end - defp maybe_plug_decoder(builder, %Opus{}) do - builder |> child(:opus_decoder, Opus.Decoder) + defp maybe_plug_decoder(builder, %Opus{}, suffix) do + builder |> child(child_name(suffix, :opus_decoder), Opus.Decoder) end - defp maybe_plug_decoder(builder, %RemoteStream{content_format: Opus, type: :packetized}) do - builder |> child(:opus_decoder, Opus.Decoder) + defp maybe_plug_decoder(builder, %RemoteStream{content_format: Opus, type: :packetized}, suffix) do + builder |> child(child_name(suffix, :opus_decoder), Opus.Decoder) end - defp maybe_plug_decoder(builder, %AAC{}) do - builder |> child(:aac_decoder, AAC.FDK.Decoder) + defp maybe_plug_decoder(builder, %AAC{}, suffix) do + builder |> child(child_name(suffix, :aac_decoder), AAC.FDK.Decoder) end - defp maybe_plug_decoder(builder, %MPEGAudio{}) do - builder |> child(:mp3_decoder, Membrane.MP3.MAD.Decoder) + defp maybe_plug_decoder(builder, %MPEGAudio{}, suffix) do + builder |> child(child_name(suffix, :mp3_decoder), Membrane.MP3.MAD.Decoder) end - defp maybe_plug_decoder(builder, %RemoteStream{content_format: MPEGAudio}) do - builder |> child(:mp3_decoder, Membrane.MP3.MAD.Decoder) + defp maybe_plug_decoder(builder, %RemoteStream{content_format: MPEGAudio}, suffix) do + builder |> child(child_name(suffix, :mp3_decoder), Membrane.MP3.MAD.Decoder) end - defp maybe_plug_decoder(builder, %RawAudio{}) do + defp maybe_plug_decoder(builder, %RawAudio{}, _suffix) do builder end - defp maybe_plug_resampler(builder, input_format, %Opus{}) + defp maybe_plug_resampler(builder, input_format, %Opus{}, suffix) when not is_opus_compliant(input_format) do builder - |> child(:resampler, %Membrane.FFmpeg.SWResample.Converter{ + |> child(child_name(suffix, :resampler), %Membrane.FFmpeg.SWResample.Converter{ output_stream_format: %RawAudio{ sample_format: :s16le, sample_rate: 48_000, @@ -154,10 +169,10 @@ defmodule Membrane.Transcoder.Audio do }) end - defp maybe_plug_resampler(builder, input_format, %AAC{}) + defp maybe_plug_resampler(builder, input_format, %AAC{}, suffix) when not is_aac_compliant(input_format) do builder - |> child(:resampler, %Membrane.FFmpeg.SWResample.Converter{ + |> child(child_name(suffix, :resampler), %Membrane.FFmpeg.SWResample.Converter{ output_stream_format: %RawAudio{ sample_format: :s16le, sample_rate: 44_100, @@ -166,31 +181,34 @@ defmodule Membrane.Transcoder.Audio do }) end - defp maybe_plug_resampler(builder, input_format, %MPEGAudio{}) + defp maybe_plug_resampler(builder, input_format, %MPEGAudio{}, suffix) when not is_mp3_compliant(input_format) do builder - |> child(:resampler, %Membrane.FFmpeg.SWResample.Converter{ + |> child(child_name(suffix, :resampler), %Membrane.FFmpeg.SWResample.Converter{ output_stream_format: %RawAudio{sample_rate: 44_100, sample_format: :s32le, channels: 2} }) end - defp maybe_plug_resampler(builder, _input_format, _output_format) do + defp maybe_plug_resampler(builder, _input_format, _output_format, _suffix) do builder end - defp maybe_plug_encoder(builder, %Opus{}) do - builder |> child(:opus_encoder, Opus.Encoder) + defp maybe_plug_encoder(builder, %Opus{}, suffix) do + builder |> child(child_name(suffix, :opus_encoder), Opus.Encoder) end - defp maybe_plug_encoder(builder, %AAC{}) do - builder |> child(:aac_encoder, AAC.FDK.Encoder) + defp maybe_plug_encoder(builder, %AAC{}, suffix) do + builder |> child(child_name(suffix, :aac_encoder), AAC.FDK.Encoder) end - defp maybe_plug_encoder(builder, %MPEGAudio{}) do - builder |> child(:mp3_encoder, Membrane.MP3.Lame.Encoder) + defp maybe_plug_encoder(builder, %MPEGAudio{}, suffix) do + builder |> child(child_name(suffix, :mp3_encoder), Membrane.MP3.Lame.Encoder) end - defp maybe_plug_encoder(builder, %RawAudio{}) do + defp maybe_plug_encoder(builder, %RawAudio{}, _suffix) do builder end + + defp child_name(nil, base), do: base + defp child_name(suffix, base), do: :"#{suffix}_#{base}" end diff --git a/lib/transcoder/video.ex b/lib/transcoder/video.ex index 5e2103a..2354675 100644 --- a/lib/transcoder/video.ex +++ b/lib/transcoder/video.ex @@ -34,14 +34,16 @@ defmodule Membrane.Transcoder.Video do video_stream_format(), video_stream_format(), :always | :if_needed | :never, - boolean() + boolean(), + String.t() | nil ) :: ChildrenSpec.builder() def plug_video_transcoding( builder, input_format, output_format, transcoding_policy, - use_hardware_acceleration? + use_hardware_acceleration?, + suffix \\ nil ) when is_video_format(input_format) and is_video_format(output_format) do do_plug_video_transcoding( @@ -49,7 +51,8 @@ defmodule Membrane.Transcoder.Video do input_format, output_format, transcoding_policy, - use_hardware_acceleration? + use_hardware_acceleration?, + suffix ) end @@ -58,7 +61,8 @@ defmodule Membrane.Transcoder.Video do %RemoteStream{content_format: h26x}, output_format, transcoding_policy, - use_hardware_acceleration? + use_hardware_acceleration?, + suffix ) when h26x in [H264, H265] do do_plug_video_transcoding( @@ -66,7 +70,8 @@ defmodule Membrane.Transcoder.Video do struct!(h26x), output_format, transcoding_policy, - use_hardware_acceleration? + use_hardware_acceleration?, + suffix ) end @@ -75,11 +80,12 @@ defmodule Membrane.Transcoder.Video do %H264{}, %H264{} = output_format, transcoding_policy, - _use_hardware_acceleration? + _use_hardware_acceleration?, + suffix ) when transcoding_policy in [:if_needed, :never] do builder - |> child(:h264_parser, %H264.Parser{ + |> child(child_name(suffix, :h264_parser), %H264.Parser{ output_stream_structure: stream_structure_type(output_format), output_alignment: output_format.alignment }) @@ -90,11 +96,12 @@ defmodule Membrane.Transcoder.Video do %H265{}, %H265{} = output_format, transcoding_policy, - _use_hardware_acceleration? + _use_hardware_acceleration?, + suffix ) when transcoding_policy in [:if_needed, :never] do builder - |> child(:h265_parser, %H265.Parser{ + |> child(child_name(suffix, :h265_parser), %H265.Parser{ output_stream_structure: stream_structure_type(output_format), output_alignment: output_format.alignment }) @@ -105,10 +112,11 @@ defmodule Membrane.Transcoder.Video do %RawVideo{} = input_format, %RawVideo{} = output_format, _transcoding_policy, - true + true, + suffix ) do builder - |> maybe_plug_swscale_converter_vulkan(input_format, output_format) + |> maybe_plug_swscale_converter_vulkan(input_format, output_format, suffix) end defp do_plug_video_transcoding( @@ -116,10 +124,11 @@ defmodule Membrane.Transcoder.Video do %RawVideo{} = input_format, %RawVideo{} = output_format, _transcoding_policy, - false + false, + suffix ) do builder - |> maybe_plug_swscale_converter(input_format, output_format) + |> maybe_plug_swscale_converter(input_format, output_format, suffix) end defp do_plug_video_transcoding( @@ -127,7 +136,8 @@ defmodule Membrane.Transcoder.Video do %format_module{}, %format_module{}, transcoding_policy, - _use_hardware_acceleration? + _use_hardware_acceleration?, + _suffix ) when transcoding_policy in [:if_needed, :never] do Membrane.Logger.debug(""" @@ -142,7 +152,8 @@ defmodule Membrane.Transcoder.Video do input_format, output_format, :never, - _use_hardware_acceleration? + _use_hardware_acceleration?, + _suffix ), do: raise(""" @@ -150,101 +161,133 @@ defmodule Membrane.Transcoder.Video do with :transcoding_policy option set to :never. """) - defp do_plug_video_transcoding(builder, input_format, output_format, _transcoding_policy, true) do + defp do_plug_video_transcoding( + builder, + input_format, + output_format, + _transcoding_policy, + true, + suffix + ) do builder - |> maybe_plug_parser_and_decoder_vulkan(input_format) - |> maybe_plug_swscale_converter_vulkan(input_format, output_format) - |> maybe_plug_encoder_and_parser_vulkan(output_format) + |> maybe_plug_parser_and_decoder_vulkan(input_format, suffix) + |> maybe_plug_swscale_converter_vulkan(input_format, output_format, suffix) + |> maybe_plug_encoder_and_parser_vulkan(output_format, suffix) end - defp do_plug_video_transcoding(builder, input_format, output_format, _transcoding_policy, false) do + defp do_plug_video_transcoding( + builder, + input_format, + output_format, + _transcoding_policy, + false, + suffix + ) do builder - |> maybe_plug_parser_and_decoder(input_format) - |> maybe_plug_swscale_converter(input_format, output_format) - |> maybe_plug_encoder_and_parser(output_format) + |> maybe_plug_parser_and_decoder(input_format, suffix) + |> maybe_plug_swscale_converter(input_format, output_format, suffix) + |> maybe_plug_encoder_and_parser(output_format, suffix) end # VK-specific decoder: child name :vk_h264_decoder distinguishes it from FFmpeg's :h264_decoder - defp maybe_plug_parser_and_decoder_vulkan(builder, %H264{}) do + defp maybe_plug_parser_and_decoder_vulkan(builder, %H264{}, suffix) do builder - |> child(:h264_input_parser, %H264.Parser{ + |> child(child_name(suffix, :h264_input_parser), %H264.Parser{ output_stream_structure: :annexb, output_alignment: :au }) - |> child(:vk_h264_decoder, Membrane.VKVideo.Decoder) + |> child(child_name(suffix, :vk_h264_decoder), Membrane.VKVideo.Decoder) end - defp maybe_plug_parser_and_decoder_vulkan(builder, format), - do: maybe_plug_parser_and_decoder(builder, format) + defp maybe_plug_parser_and_decoder_vulkan(builder, format, suffix), + do: maybe_plug_parser_and_decoder(builder, format, suffix) - defp maybe_plug_parser_and_decoder(builder, %H264{}) do + defp maybe_plug_parser_and_decoder(builder, %H264{}, suffix) do builder - |> child(:h264_input_parser, %H264.Parser{ + |> child(child_name(suffix, :h264_input_parser), %H264.Parser{ output_stream_structure: :annexb, output_alignment: :au }) - |> child(:h264_decoder, %H264.FFmpeg.Decoder{}) + |> child(child_name(suffix, :h264_decoder), %H264.FFmpeg.Decoder{}) end - defp maybe_plug_parser_and_decoder(builder, %H265{}) do + defp maybe_plug_parser_and_decoder(builder, %H265{}, suffix) do builder - |> child(:h265_input_parser, %H265.Parser{ + |> child(child_name(suffix, :h265_input_parser), %H265.Parser{ output_stream_structure: :annexb, output_alignment: :au }) - |> child(:h265_decoder, %H265.FFmpeg.Decoder{}) + |> child(child_name(suffix, :h265_decoder), %H265.FFmpeg.Decoder{}) end - defp maybe_plug_parser_and_decoder(builder, %vpx{}) when vpx in [VP8, VP9] do + defp maybe_plug_parser_and_decoder(builder, %vpx{}, suffix) when vpx in [VP8, VP9] do decoder_module = Module.concat(vpx, Decoder) - builder |> child(:vp8_decoder, decoder_module) + builder |> child(child_name(suffix, :vp8_decoder), decoder_module) end - defp maybe_plug_parser_and_decoder(builder, %RemoteStream{ - content_format: vpx, - type: :packetized - }) + defp maybe_plug_parser_and_decoder( + builder, + %RemoteStream{content_format: vpx, type: :packetized}, + suffix + ) when vpx in [VP8, VP9] do decoder_module = Module.concat(vpx, Decoder) - builder |> child(:vp8_decoder, decoder_module) + builder |> child(child_name(suffix, :vp8_decoder), decoder_module) end - defp maybe_plug_parser_and_decoder(builder, %RawVideo{}), do: builder - - # VK decoder outputs NV12; these clauses handle the NV12 <-> other-format conversion - # when mixing VK decode/encode with different pixel formats. + defp maybe_plug_parser_and_decoder(builder, %RawVideo{}, _suffix), do: builder - defp maybe_plug_swscale_converter_vulkan(builder, %H264{}, %RawVideo{pixel_format: nil}), - do: builder + defp maybe_plug_swscale_converter_vulkan( + builder, + %H264{}, + %RawVideo{pixel_format: nil}, + _suffix + ), + do: builder - defp maybe_plug_swscale_converter_vulkan(builder, %H264{}, %RawVideo{pixel_format: :NV12}), - do: builder + defp maybe_plug_swscale_converter_vulkan( + builder, + %H264{}, + %RawVideo{pixel_format: :NV12}, + _suffix + ), + do: builder - defp maybe_plug_swscale_converter_vulkan(builder, %H264{}, %RawVideo{} = output_format) do - builder |> child(:raw_video_converter, %SWScale.Converter{format: output_format.pixel_format}) + defp maybe_plug_swscale_converter_vulkan(builder, %H264{}, %RawVideo{} = output_format, suffix) do + builder + |> child(child_name(suffix, :raw_video_converter), %SWScale.Converter{ + format: output_format.pixel_format + }) end - defp maybe_plug_swscale_converter_vulkan(builder, %RawVideo{pixel_format: :NV12}, %H264{}), - do: builder + defp maybe_plug_swscale_converter_vulkan( + builder, + %RawVideo{pixel_format: :NV12}, + %H264{}, + _suffix + ), + do: builder - defp maybe_plug_swscale_converter_vulkan(builder, %RawVideo{}, %H264{}) do - builder |> child(:raw_video_converter, %SWScale.Converter{format: :NV12}) + defp maybe_plug_swscale_converter_vulkan(builder, %RawVideo{}, %H264{}, suffix) do + builder |> child(child_name(suffix, :raw_video_converter), %SWScale.Converter{format: :NV12}) end - defp maybe_plug_swscale_converter_vulkan(builder, %H264{}, %H264{}), do: builder + defp maybe_plug_swscale_converter_vulkan(builder, %H264{}, %H264{}, _suffix), do: builder - defp maybe_plug_swscale_converter_vulkan(builder, %H264{}, _output_format) do - builder |> child(:raw_video_converter, %SWScale.Converter{format: :I420}) + defp maybe_plug_swscale_converter_vulkan(builder, %H264{}, _output_format, suffix) do + builder + |> child(child_name(suffix, :raw_video_converter), %SWScale.Converter{format: :I420}) end - defp maybe_plug_swscale_converter_vulkan(builder, _input_format, %H264{}) do - builder |> child(:raw_video_converter, %SWScale.Converter{format: :NV12}) + defp maybe_plug_swscale_converter_vulkan(builder, _input_format, %H264{}, suffix) do + builder + |> child(child_name(suffix, :raw_video_converter), %SWScale.Converter{format: :NV12}) end - defp maybe_plug_swscale_converter_vulkan(builder, input_format, output_format), - do: maybe_plug_swscale_converter(builder, input_format, output_format) + defp maybe_plug_swscale_converter_vulkan(builder, input_format, output_format, suffix), + do: maybe_plug_swscale_converter(builder, input_format, output_format, suffix) - defp maybe_plug_swscale_converter(builder, input_format, %RawVideo{} = output_format) do + defp maybe_plug_swscale_converter(builder, input_format, %RawVideo{} = output_format, suffix) do case input_format do _any when output_format.pixel_format == nil -> builder @@ -254,11 +297,14 @@ defmodule Membrane.Transcoder.Video do _input_format -> builder - |> child(:raw_video_converter, %SWScale.Converter{format: output_format.pixel_format}) + |> child(child_name(suffix, :raw_video_converter), %SWScale.Converter{ + format: output_format.pixel_format + }) end end - defp maybe_plug_swscale_converter(builder, input_format, %h26x{}) when h26x in [H264, H265] do + defp maybe_plug_swscale_converter(builder, input_format, %h26x{}, suffix) + when h26x in [H264, H265] do case input_format do %RawVideo{pixel_format: pixel_format} when pixel_format in [:I420, :I422] -> builder @@ -268,43 +314,43 @@ defmodule Membrane.Transcoder.Video do _input_format -> builder - |> child(:raw_video_converter, %SWScale.Converter{format: :I420}) + |> child(child_name(suffix, :raw_video_converter), %SWScale.Converter{format: :I420}) end end - defp maybe_plug_swscale_converter(builder, _input_format, _output_format), do: builder + defp maybe_plug_swscale_converter(builder, _input_format, _output_format, _suffix), do: builder - defp maybe_plug_encoder_and_parser_vulkan(builder, %H264{} = h264) do + defp maybe_plug_encoder_and_parser_vulkan(builder, %H264{} = h264, suffix) do builder - |> child(:vk_h264_encoder, Membrane.VKVideo.Encoder) - |> child(:h264_output_parser, %H264.Parser{ + |> child(child_name(suffix, :vk_h264_encoder), Membrane.VKVideo.Encoder) + |> child(child_name(suffix, :h264_output_parser), %H264.Parser{ output_stream_structure: stream_structure_type(h264), output_alignment: h264.alignment }) end - defp maybe_plug_encoder_and_parser_vulkan(builder, format), - do: maybe_plug_encoder_and_parser(builder, format) + defp maybe_plug_encoder_and_parser_vulkan(builder, format, suffix), + do: maybe_plug_encoder_and_parser(builder, format, suffix) - defp maybe_plug_encoder_and_parser(builder, %H264{} = h264) do + defp maybe_plug_encoder_and_parser(builder, %H264{} = h264, suffix) do builder - |> child(:h264_encoder, %H264.FFmpeg.Encoder{preset: :ultrafast}) - |> child(:h264_output_parser, %H264.Parser{ + |> child(child_name(suffix, :h264_encoder), %H264.FFmpeg.Encoder{preset: :ultrafast}) + |> child(child_name(suffix, :h264_output_parser), %H264.Parser{ output_stream_structure: stream_structure_type(h264), output_alignment: h264.alignment }) end - defp maybe_plug_encoder_and_parser(builder, %H265{} = h265) do + defp maybe_plug_encoder_and_parser(builder, %H265{} = h265, suffix) do builder - |> child(:h265_encoder, %H265.FFmpeg.Encoder{preset: :ultrafast}) - |> child(:h265_output_parser, %H265.Parser{ + |> child(child_name(suffix, :h265_encoder), %H265.FFmpeg.Encoder{preset: :ultrafast}) + |> child(child_name(suffix, :h265_output_parser), %H265.Parser{ output_stream_structure: stream_structure_type(h265), output_alignment: h265.alignment }) end - defp maybe_plug_encoder_and_parser(builder, %VP8{}) do + defp maybe_plug_encoder_and_parser(builder, %VP8{}, suffix) do cpu_quota = :erlang.system_info(:cpu_quota) number_of_threads = @@ -312,10 +358,14 @@ defmodule Membrane.Transcoder.Video do do: cpu_quota, else: :erlang.system_info(:logical_processors_available) - builder |> child(:vp8_encoder, %VP8.Encoder{g_threads: number_of_threads, cpu_used: 15}) + builder + |> child(child_name(suffix, :vp8_encoder), %VP8.Encoder{ + g_threads: number_of_threads, + cpu_used: 15 + }) end - defp maybe_plug_encoder_and_parser(builder, %VP9{}) do + defp maybe_plug_encoder_and_parser(builder, %VP9{}, suffix) do cpu_quota = :erlang.system_info(:cpu_quota) number_of_threads = @@ -323,10 +373,14 @@ defmodule Membrane.Transcoder.Video do do: cpu_quota, else: :erlang.system_info(:logical_processors_available) - builder |> child(:vp8_encoder, %VP9.Encoder{g_threads: number_of_threads, cpu_used: 15}) + builder + |> child(child_name(suffix, :vp8_encoder), %VP9.Encoder{ + g_threads: number_of_threads, + cpu_used: 15 + }) end - defp maybe_plug_encoder_and_parser(builder, %RawVideo{}), do: builder + defp maybe_plug_encoder_and_parser(builder, %RawVideo{}, _suffix), do: builder defp stream_structure_type(%h26x{stream_structure: stream_structure}) when h26x in [H264, H265] do @@ -335,4 +389,7 @@ defmodule Membrane.Transcoder.Video do {type, _dcr} when type in [:avc1, :avc3, :hvc1, :hev1] -> type end end + + defp child_name(nil, base), do: base + defp child_name(suffix, base), do: :"#{suffix}_#{base}" end diff --git a/mix.exs b/mix.exs index 79227d8..e5b1ffc 100644 --- a/mix.exs +++ b/mix.exs @@ -39,6 +39,7 @@ defmodule Membrane.Transcoder.Plugin.Mixfile do [ {:membrane_vk_video_plugin, "~> 0.2.0", optional: true}, {:membrane_core, "~> 1.2 and >= 1.2.1"}, + {:membrane_tee_plugin, "~> 0.12.0"}, {:membrane_opus_plugin, "~> 0.20.3"}, {:membrane_aac_plugin, "~> 0.19.0"}, {:membrane_aac_fdk_plugin, "~> 0.18.0"}, diff --git a/mix.lock b/mix.lock index 2941492..0916b6b 100644 --- a/mix.lock +++ b/mix.lock @@ -44,6 +44,7 @@ "membrane_raw_audio_format": {:hex, :membrane_raw_audio_format, "0.12.0", "b574cd90f69ce2a8b6201b0ccf0826ca28b0fbc8245b8078d9f11cef65f7d5d5", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "6e6c98e3622a2b9df19eab50ba65d7eb45949b1ba306fa8423df6cdb12fd0b44"}, "membrane_raw_audio_parser_plugin": {:hex, :membrane_raw_audio_parser_plugin, "0.4.0", "7a1e53b68a221d00e47fb5d3c7e29200dfe8f7bc0862e69000b61c6562093acc", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}], "hexpm", "ff8d3fba45b1c2814b68d49878f19d2c1ad1147b53f606b48b6b67068435dcd0"}, "membrane_raw_video_format": {:hex, :membrane_raw_video_format, "0.4.3", "61b2f6afdffa43c25de5a433c3a3bed933144be0f753b6fc8c6a9f255382eaff", [:mix], [{:image, ">= 0.54.0", [hex: :image, repo: "hexpm", optional: true]}], "hexpm", "11739a7d956d037f3ee109f06f075f1a99fea000c778628ac58ed28637e4c637"}, + "membrane_tee_plugin": {:hex, :membrane_tee_plugin, "0.12.0", "f94989b4080ef4b7937d74c1a14d3379577c7bd4c6d06e5a2bb41c351ad604d4", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "0d61c9ed5e68e5a75d54200e1c6df5739c0bcb52fee0974183ad72446a179887"}, "membrane_timestamp_queue": {:hex, :membrane_timestamp_queue, "0.2.2", "1c831b2273d018a6548654aa9f7fa7c4b683f71d96ffe164934ef55f9d11f693", [:mix], [{:heap, "~> 2.0", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "7c830e760baaced0988421671cd2c83c7cda8d1bd2b61fd05332711675d1204f"}, "membrane_vk_video_plugin": {:hex, :membrane_vk_video_plugin, "0.2.1", "7304ae294ca44279e8dcc5e3206ed31157467e9d65b12cb7e4db246fb1d58dd4", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_raw_video_format, "~> 0.4.2", [hex: :membrane_raw_video_format, repo: "hexpm", optional: false]}, {:rustler, "~> 0.37.1", [hex: :rustler, repo: "hexpm", optional: false]}], "hexpm", "19bdae7d8bbcdc110d4aa991d5cb7154a743a985a30842cdc24012b402258682"}, "membrane_vp8_format": {:hex, :membrane_vp8_format, "0.5.0", "a589c20bb9d97ddc9b717684d00cefc84e2500ce63a0c33c4b9618d9b2f9b2ea", [:mix], [], "hexpm", "d29e0dae4bebc6838e82e031c181fe626d168c687e4bc617c1d0772bdeed19d5"}, diff --git a/test/integration_test.exs b/test/integration_test.exs index 967e17b..65ed8a8 100644 --- a/test/integration_test.exs +++ b/test/integration_test.exs @@ -3,6 +3,8 @@ defmodule Membrane.Transcoder.IntegrationTest do import Membrane.Testing.Assertions import Membrane.ChildrenSpec + require Membrane.Pad + alias Membrane.{AAC, H264, H265, MPEGAudio, Opus, RawAudio, RawVideo, VP8, VP9} alias Membrane.Testing alias Membrane.Transcoder.Support.Preprocessors @@ -59,10 +61,11 @@ defmodule Membrane.Transcoder.IntegrationTest do location: Path.join("./test/fixtures", unquote(test_case.input_file)) }) |> then(unquote(test_case.preprocess)) - |> child(%Membrane.Transcoder{ + |> child(:transcoder, %Membrane.Transcoder{ output_stream_format: unquote(test_case.output_format), assumed_input_stream_format: override_input_stream_format }) + |> via_out(Membrane.Pad.ref(:output, 0)) |> child(:sink, Testing.Sink) Testing.Pipeline.execute_actions(pid, spec: spec) @@ -115,6 +118,27 @@ defmodule Membrane.Transcoder.IntegrationTest do output_stream_format: test_case.output_format, native_acceleration: native_acceleration }) + |> via_out(Membrane.Pad.ref(:output, 0)) + |> child(:sink, %Membrane.File.Sink{location: tmp_path}) + + Testing.Pipeline.execute_actions(pid, spec: spec) + assert_end_of_stream(pid, :sink, :input, 30_000) + Testing.Pipeline.terminate(pid) + + bytes = File.read!(tmp_path) + File.rm(tmp_path) + bytes + end + + defp transcode_to_bytes(input_file, preprocess, output_format) do + tmp_path = Path.join(System.tmp_dir!(), "ref_#{:erlang.unique_integer([:positive])}") + pid = Testing.Pipeline.start_link_supervised!() + + spec = + child(%Membrane.File.Source{location: input_file}) + |> then(preprocess) + |> child(:transcoder, %Membrane.Transcoder{output_stream_format: output_format}) + |> via_out(Membrane.Pad.ref(:output, 0)) |> child(:sink, %Membrane.File.Sink{location: tmp_path}) Testing.Pipeline.execute_actions(pid, spec: spec) @@ -173,13 +197,16 @@ defmodule Membrane.Transcoder.IntegrationTest do %AAC{} -> format end - spec = + spec = [ child(:source, %FormatSource{format: format}) |> child(:transcoder, %Membrane.Transcoder{ output_stream_format: output_format, transcoding_policy: transcoding_policy - }) + }), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 0)) |> child(:sink, Testing.Sink) + ] pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) @@ -189,8 +216,12 @@ defmodule Membrane.Transcoder.IntegrationTest do %H264{} -> [:h264_encoder, :h264_decoder] %AAC{} -> [:aac_encoder, :aac_decoder] end - |> Enum.each(fn child_name -> - get_child_result = Testing.Pipeline.get_child_pid(pipeline, [:transcoder, child_name]) + |> Enum.each(fn base_name -> + get_child_result = + Testing.Pipeline.get_child_pid( + pipeline, + [:transcoder, :"output_0_#{base_name}"] + ) if transcoding_policy == :always do assert {:ok, child_pid} = get_child_result @@ -211,6 +242,7 @@ defmodule Membrane.Transcoder.IntegrationTest do output_stream_format: VP8, transcoding_policy: :never }) + |> via_out(Membrane.Pad.ref(:output, 0)) |> child(:sink, Testing.Sink) {:ok, supervisor, pipeline} = Testing.Pipeline.start(spec: []) @@ -228,27 +260,220 @@ defmodule Membrane.Transcoder.IntegrationTest do test "uses FFmpeg decoder and encoder when native_acceleration is :never" do pid = Testing.Pipeline.start_link_supervised!() - spec = + spec = [ child(%Membrane.File.Source{location: "./test/fixtures/video.h264"}) |> then(&Preprocessors.parse_h264/1) |> child(:transcoder, %Membrane.Transcoder{ output_stream_format: H264, transcoding_policy: :always, native_acceleration: :never - }) + }), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 0)) |> child(:sink, Testing.Sink) + ] Testing.Pipeline.execute_actions(pid, spec: spec) assert_sink_stream_format(pid, :sink, _format) - assert {:ok, _pid} = Testing.Pipeline.get_child_pid(pid, [:transcoder, :h264_decoder]) - assert {:ok, _pid} = Testing.Pipeline.get_child_pid(pid, [:transcoder, :h264_encoder]) + assert {:ok, _pid} = + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_h264_decoder]) + + assert {:ok, _pid} = + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_h264_encoder]) + + assert {:error, :child_not_found} = + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_vk_h264_decoder]) + + assert {:error, :child_not_found} = + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_vk_h264_encoder]) + + Testing.Pipeline.terminate(pid) + end + + test "multivariant output: two outputs with different formats from H264 input" do + ref_h264 = transcode_to_bytes("./test/fixtures/video.h264", &Preprocessors.parse_h264/1, H264) + ref_h265 = transcode_to_bytes("./test/fixtures/video.h264", &Preprocessors.parse_h264/1, H265) + + pid = Testing.Pipeline.start_link_supervised!() + tmp = fn -> Path.join(System.tmp_dir!(), "mv_#{:erlang.unique_integer([:positive])}") end + h264_tmp = tmp.() + h265_tmp = tmp.() + + spec = [ + child(%Membrane.File.Source{location: "./test/fixtures/video.h264"}) + |> then(&Preprocessors.parse_h264/1) + |> child(:transcoder, Membrane.Transcoder), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 0), options: [output_stream_format: H264]) + |> child(:sink_h264, %Membrane.File.Sink{location: h264_tmp}), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 1), options: [output_stream_format: H265]) + |> child(:sink_h265, %Membrane.File.Sink{location: h265_tmp}) + ] + + Testing.Pipeline.execute_actions(pid, spec: spec) + assert_end_of_stream(pid, :sink_h264, :input, 30_000) + assert_end_of_stream(pid, :sink_h265, :input, 30_000) + Testing.Pipeline.terminate(pid) + + mv_h264 = File.read!(h264_tmp) + mv_h265 = File.read!(h265_tmp) + File.rm(h264_tmp) + File.rm(h265_tmp) + + assert mv_h264 == ref_h264 + assert mv_h265 == ref_h265 + end + + test "multivariant output: two video outputs with different resolutions" do + ref_h264 = transcode_to_bytes("./test/fixtures/video.h264", &Preprocessors.parse_h264/1, H264) + ref_vp8 = transcode_to_bytes("./test/fixtures/video.h264", &Preprocessors.parse_h264/1, VP8) + + pid = Testing.Pipeline.start_link_supervised!() + tmp = fn -> Path.join(System.tmp_dir!(), "mv_#{:erlang.unique_integer([:positive])}") end + h264_tmp = tmp.() + vp8_tmp = tmp.() + + spec = [ + child(%Membrane.File.Source{location: "./test/fixtures/video.h264"}) + |> then(&Preprocessors.parse_h264/1) + |> child(:transcoder, Membrane.Transcoder), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 0), options: [output_stream_format: H264]) + |> child(:sink_h264, %Membrane.File.Sink{location: h264_tmp}), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 1), options: [output_stream_format: VP8]) + |> child(:sink_vp8, %Membrane.File.Sink{location: vp8_tmp}) + ] + + Testing.Pipeline.execute_actions(pid, spec: spec) + assert_end_of_stream(pid, :sink_h264, :input, 30_000) + assert_end_of_stream(pid, :sink_vp8, :input, 30_000) + Testing.Pipeline.terminate(pid) + + mv_h264 = File.read!(h264_tmp) + mv_vp8 = File.read!(vp8_tmp) + File.rm(h264_tmp) + File.rm(vp8_tmp) + + assert mv_h264 == ref_h264 + assert mv_vp8 == ref_vp8 + end + + test "multivariant output: per-output transcoding_policy is respected" do + pid = Testing.Pipeline.start_link_supervised!() + + spec = [ + child(:source, %FormatSource{format: %H264{alignment: :au, stream_structure: :annexb}}) + |> child(:transcoder, Membrane.Transcoder), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 0), + options: [ + output_stream_format: %H264{alignment: :au, stream_structure: :avc1}, + transcoding_policy: :always + ] + ) + |> child(:sink_always, Testing.Sink), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 1), + options: [ + output_stream_format: %H264{alignment: :au, stream_structure: :avc1}, + transcoding_policy: :if_needed + ] + ) + |> child(:sink_if_needed, Testing.Sink) + ] + + Testing.Pipeline.execute_actions(pid, spec: spec) + + Process.sleep(500) + + # :always output should have encoder/decoder with "output_0_" prefix + assert {:ok, _pid} = + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_h264_decoder]) + assert {:ok, _pid} = + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_h264_encoder]) + + # :if_needed output should NOT have encoder/decoder (same format type) assert {:error, :child_not_found} = - Testing.Pipeline.get_child_pid(pid, [:transcoder, :vk_h264_decoder]) + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_1_h264_decoder]) assert {:error, :child_not_found} = - Testing.Pipeline.get_child_pid(pid, [:transcoder, :vk_h264_encoder]) + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_1_h264_encoder]) + + Testing.Pipeline.terminate(pid) + end + + test "multivariant output: three audio outputs with different formats" do + ref_aac = transcode_to_bytes("./test/fixtures/audio.aac", &Preprocessors.parse_aac/1, AAC) + ref_opus = transcode_to_bytes("./test/fixtures/audio.aac", &Preprocessors.parse_aac/1, Opus) + + ref_mp3 = + transcode_to_bytes("./test/fixtures/audio.aac", &Preprocessors.parse_aac/1, MPEGAudio) + + pid = Testing.Pipeline.start_link_supervised!() + tmp = fn -> Path.join(System.tmp_dir!(), "mv_#{:erlang.unique_integer([:positive])}") end + aac_tmp = tmp.() + opus_tmp = tmp.() + mp3_tmp = tmp.() + + spec = [ + child(%Membrane.File.Source{location: "./test/fixtures/audio.aac"}) + |> then(&Preprocessors.parse_aac/1) + |> child(:transcoder, Membrane.Transcoder), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 0), options: [output_stream_format: AAC]) + |> child(:sink_aac, %Membrane.File.Sink{location: aac_tmp}), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 1), options: [output_stream_format: Opus]) + |> child(:sink_opus, %Membrane.File.Sink{location: opus_tmp}), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 2), options: [output_stream_format: MPEGAudio]) + |> child(:sink_mp3, %Membrane.File.Sink{location: mp3_tmp}) + ] + + Testing.Pipeline.execute_actions(pid, spec: spec) + assert_end_of_stream(pid, :sink_aac, :input, 30_000) + assert_end_of_stream(pid, :sink_opus, :input, 30_000) + assert_end_of_stream(pid, :sink_mp3, :input, 30_000) + Testing.Pipeline.terminate(pid) + + mv_aac = File.read!(aac_tmp) + mv_opus = File.read!(opus_tmp) + mv_mp3 = File.read!(mp3_tmp) + File.rm(aac_tmp) + File.rm(opus_tmp) + File.rm(mp3_tmp) + + assert mv_aac == ref_aac + assert mv_opus == ref_opus + assert mv_mp3 == ref_mp3 + end + + test "multivariant output: per-output options override bin-level options" do + pid = Testing.Pipeline.start_link_supervised!() + + spec = [ + child(%Membrane.File.Source{location: "./test/fixtures/video.h264"}) + |> then(&Preprocessors.parse_h264/1) + |> child(:transcoder, %Membrane.Transcoder{output_stream_format: H264}), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 0)) + |> child(:sink_default, Testing.Sink), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 1), options: [output_stream_format: H265]) + |> child(:sink_override, Testing.Sink) + ] + + Testing.Pipeline.execute_actions(pid, spec: spec) + + assert_sink_stream_format(pid, :sink_default, format_default, 10_000) + assert format_default.__struct__ == H264 + + assert_sink_stream_format(pid, :sink_override, format_override, 10_000) + assert format_override.__struct__ == H265 Testing.Pipeline.terminate(pid) end @@ -257,27 +482,33 @@ defmodule Membrane.Transcoder.IntegrationTest do test "uses VKVideo decoder and encoder when native_acceleration is :if_available" do pid = Testing.Pipeline.start_link_supervised!() - spec = + spec = [ child(%Membrane.File.Source{location: "./test/fixtures/video.h264"}) |> then(&Preprocessors.parse_h264/1) |> child(:transcoder, %Membrane.Transcoder{ output_stream_format: H264, transcoding_policy: :always, native_acceleration: :if_available - }) + }), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 0)) |> child(:sink, Testing.Sink) + ] Testing.Pipeline.execute_actions(pid, spec: spec) assert_sink_stream_format(pid, :sink, _format) - assert {:ok, _pid} = Testing.Pipeline.get_child_pid(pid, [:transcoder, :vk_h264_decoder]) - assert {:ok, _pid} = Testing.Pipeline.get_child_pid(pid, [:transcoder, :vk_h264_encoder]) + assert {:ok, _pid} = + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_vk_h264_decoder]) + + assert {:ok, _pid} = + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_vk_h264_encoder]) assert {:error, :child_not_found} = - Testing.Pipeline.get_child_pid(pid, [:transcoder, :h264_decoder]) + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_h264_decoder]) assert {:error, :child_not_found} = - Testing.Pipeline.get_child_pid(pid, [:transcoder, :h264_encoder]) + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_h264_encoder]) Testing.Pipeline.terminate(pid) end