Replies: 4 comments 21 replies
-
Hey @ndrean, thanks for creating the issue.
Is it possible to send the complete ffmpeg command for testing? I'll take a look post work today. |
Beta Was this translation helpful? Give feedback.
-
@akash-akya :ok =
ExCmd.stream!(
~w(#{state.ffmpeg} -i pipe:0 -frames:v 1 -f image2 -vcodec mjpeg -y priv/static/images/test_%02d.jpg),
input: File.stream!(path, [], 65_336)
)
|> Stream.run() Result is :ok ExCmd.stream!(
~w(#{state.ffmpeg} -i pipe:0 -frames:v 1 -f image2 -vcodec mjpeg -y priv/static/images/test_%02d.jpg),
input: File.stream!(frame, [], 65336)
) #=> %ExCmd.Stream{
process: #PID<0.4555.0>,
stream_opts: %{
input: {:enumerable,
%File.Stream{
path: "/var/folders/mz/91hbds1j23125yksdf67dcgm0000gn/T/plug-1719-tBhO/multipart-1719241495-896826904435-1",
modes: [:raw, :read_ahead, :binary],
line_or_bytes: 65336,
raw: true,
node: :nonode@nohost
}},
exit_timeout: :infinity
}
}
|> Stream.run() #=> :ok This works, almost (modulo you exclude "priv/static/images" from being watched by the hot_reload"), but the output file is continuously overwritten. I believe this is a single task, meaning that the FFmpeg is not kept alive, thus it does not auto-increment w/r to the pattern %02d. Is it possible for ExCmd let FFmpeg run as a controlled process, like Porcelain or erlexec does? As a side note, dialyzer gives a warning even if the command works. The function call will not succeed.
File.stream!(_frame :: any(), [], 65336)
breaks the contract
(Path.t(), :line | pos_integer(), [stream_mode()]) :: File.Stream.t() |
Beta Was this translation helpful? Give feedback.
-
Hey @ndrean, I was checking this and since in this case, we get chunks from user as separate {:ok, _} = Supervisor.start_link(
[{Task.Supervisor, name: FfmpegTest.TaskSupervisor}],
strategy: :one_for_one
)
defmodule FfmpegTest do
require Logger
@spec start_ffmpeg(String.t()) :: pid()
def start_ffmpeg(base_path) do
{:ok, ffmpeg_proc} =
Task.Supervisor.start_child(FfmpegTest.TaskSupervisor, fn ->
segment_pattern = Path.join(base_path, "temp_segment_%03d.ts")
playlist_path = Path.join(base_path, "temp_stream.m3u8")
cmd = build_ffmpeg_cmd(segment_pattern, playlist_path)
{:ok, ffmpeg_proc} = ExCmd.Process.start_link(cmd)
Logger.info("ffmpeg started. Start sending chunks")
received_to_stdin!(ffmpeg_proc)
Logger.info("All chunks are forwarded, waiting for the ffmpeg to finish")
await_exit!(ffmpeg_proc)
Logger.info("Ffmpeg finished successfully\n#{File.read!(playlist_path)}")
end)
ffmpeg_proc
end
defp received_to_stdin!(ffmpeg_proc) do
receive do
{:chunk, data} ->
:ok = ExCmd.Process.write(ffmpeg_proc, data)
received_to_stdin!(ffmpeg_proc)
:done ->
# indicate that we are done sending all chunks
:ok = ExCmd.Process.close_stdin(ffmpeg_proc)
end
end
# consume everything written to stdout (if there is any)
# and wait for cmd to exit
defp await_exit!(ffmpeg_proc) do
case ExCmd.Process.read(ffmpeg_proc) do
{:ok, data} ->
Logger.info("[stdout] #{inspect(data)}")
await_exit!(ffmpeg_proc)
:eof ->
{:ok, 0} = ExCmd.Process.await_exit(ffmpeg_proc)
end
end
defp build_ffmpeg_cmd(segment_pattern, playlist_path) do
[
"ffmpeg",
# Input from stdin (pipe)
["-i", "pipe:0"],
# sets the input frame rate to 20 frames per second.
# Adjusted to the value used in the browser:"canvas.captureStream(20)"
["-r", "20"],
# Video codec to use (libx264)
["-c:v", "libx264"],
# Duration of each segment in seconds
["-hls_time", "2"],
# Number of segments to keep in the playlist (rolling playlist)
["-hls_list_size", "5"],
# Option to delete old segments
["-hls_flags", "delete_segments"],
# Type of playlist (live for continuous update)
["-hls_playlist_type", "event"],
# Segment file naming pattern
["-hls_segment_filename", segment_pattern],
# Playlist file
playlist_path
]
|> List.flatten()
end
end
##################
ffmpeg_proc = FfmpegTest.start_ffmpeg("/Users/akash/repo/elixir/ex_cmd/temp")
# forward each chunk we receive in the request to ffmpeg
send(ffmpeg_proc, {:chunk, File.read!("/Users/akash/repo/elixir/ex_cmd/segment_000.ts")})
send(ffmpeg_proc, {:chunk, File.read!("/Users/akash/repo/elixir/ex_cmd/segment_001.ts")})
# say we are done when there are no more data
send(ffmpeg_proc, :done) Output
wdyt? |
Beta Was this translation helpful? Give feedback.
-
I think this could be another example for your documentation. I just added a stop button to test how FFmpeg is killed. Application.put_env(:sample, Example.Endpoint,
http: [ip: {127, 0, 0, 1}, port: 5001],
server: true,
live_view: [signing_salt: "aaaaaaaa"],
secret_key_base: String.duplicate("a", 64)
)
Mix.install([
{:plug_cowboy, "~> 2.7"},
{:jason, "~> 1.4"},
{:phoenix, "~> 1.7.0"},
{:phoenix_live_view, "~> 0.20.0"},
{:ex_cmd, "~> 0.12"},
])
defmodule Example.ErrorView do
def render(template, _), do: Phoenix.Controller.status_message_from_template(template)
end
defmodule Example.HomeLive do
use Phoenix.LiveView, layout: {__MODULE__, :live}
def mount(_params, _session, socket) do
{:ok, assign(socket, :count, 0)}
end
defp phx_vsn, do: Application.spec(:phoenix, :vsn)
defp lv_vsn, do: Application.spec(:phoenix_live_view, :vsn)
def render("live.html", assigns) do
~H"""
<script src={"https://cdn.jsdelivr.net/npm/phoenix@#{phx_vsn()}/priv/static/phoenix.min.js"}>
</script>
<script
src={"https://cdn.jsdelivr.net/npm/phoenix_live_view@#{lv_vsn()}/priv/static/phoenix_live_view.min.js"}
>
</script>
<script>
let liveSocket = new window.LiveView.LiveSocket("/live", window.Phoenix.Socket)
liveSocket.connect()
function run() {
navigator.mediaDevices.getUserMedia({video: true})
.then((stream)=> {
let video = document.getElementById("hls");
let send = true;
document.getElementById("stop").onclick = () => {
send = false;
}
video.srcObject = stream
let mediaRecorder = new MediaRecorder(stream);
mediaRecorder.ondataavailable = ({data}) => {
if (!send) return;
if (data.size > 0) {
console.log(data.size)
const file = new File([data], "chunk.webm", {
type: "video/webm",
});
const formData = new FormData();
formData.append("file", file);
fetch(`/upload`, {method: "POST",body: formData})
.then((res) => res.text())
.then(console.log)
}
}
mediaRecorder.start(1000)
})
}
run()
</script>
<%= @inner_content %>
"""
end
def render(assigns) do
~H"""
<video id="video" width="300" height="300" controls autoplay></video>
<button type="button" id="stop" phx-click="stop">STOP</button>
"""
end
def handle_event("stop", _, socket) do
FFmpeger.stop()
{:noreply, socket}
end
end
defmodule Example.Router do
use Phoenix.Router
import Phoenix.LiveView.Router
pipeline :browser do
plug(:accepts, ["html"])
plug Plug.Parsers, parsers: [:urlencoded, :multipart]
end
scope "/", Example do
pipe_through(:browser)
live("/", HomeLive, :index)
post("/upload", Post, :upload)
end
end
defmodule FFmpeger do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, {}, name: __MODULE__)
end
def enqueue(path) do
GenServer.call(__MODULE__, {:process, path})
end
def stop(), do: GenServer.call(__MODULE__, :stop)
def init(_) do
dir = System.tmp_dir()
playlist_path = Path.join(dir, "stream.m3u8")
segment_path = Path.join(dir, "segment_%03d.ts")
ffmpeg = System.find_executable("ffmpeg")
cmd = ~w(#{ffmpeg} -loglevel debug -hide_banner -i pipe:0 -r 20 -c:v libx264 -hls_time 2 -hls_list_size 5 -hls_flags delete_segments+append_list -hls_playlist_type event -hls_segment_filename #{segment_path} #{playlist_path})
{:ok, _pid} = ExCmd.Process.start_link(cmd)
end
def handle_call({:process, path}, _from, pid) do
data = File.read!(path)
ExCmd.Process.write(pid, data)
IO.puts "processed-----------------#{byte_size(data)}"
{:reply, :processed, pid}
end
def handle_call(:stop, _from, pid) do
IO.puts "stopping-----------------"
# tell ffmpeg that we are done sending chunks
:ok = ExCmd.Process.close_stdin(pid)
# Ideally this should not be needed, I'll fix soon in the master
:eof = ExCmd.Process.read(pid)
# wait for for ffmpeg to terminate, ensure it is terminated successfully
{:ok, 0} = ExCmd.Process.await_exit(pid)
{:stop, :shutdown, pid}
end
end
defmodule Example.Post do
use Phoenix.Controller
def upload(conn,%{"file" => %Plug.Upload{path: path}} ) do
:processed = FFmpeger.enqueue(path)
conn |> put_status(201) |> json(%{status: "ok"})
end
end
defmodule Example.Endpoint do
use Phoenix.Endpoint, otp_app: :sample
socket("/live", Phoenix.LiveView.Socket)
plug(Example.Router)
end
{:ok, _} = Supervisor.start_link([Example.Endpoint, FFmpeger], strategy: :one_for_one)
Process.sleep(:infinity) |
Beta Was this translation helpful? Give feedback.
-
This is not an issue but I would like to open a discussion on how to use ExCmd.
My use case is:
%PlugUpload{path: path}
.I currently
data = File.read!(path)
and pass the data to a process. It is a GenServer that initsPorcelain
with a command that runs FFmpeg: indeed, I run the FFmpeg using the stdin buffer, so I need it to be running. FFmpeg is building HLS segments and a playlist.My interest in
ExCmd
is that it seems I don't need a GenServer.I can
%Plug.Upload.give_away(FFmpegProcess, path, self())
and just pass the path.How would you do then? The following does not run. I have no external file to output to since FFmpeg builds the segments and the playlist, passed as arguments in the command.
ExCmd.stream!(__ffmpeg_cmd__, input: File.read!(path))
I tried to add
Enum.into("")
, but I get{:error, :epipe}
.I did not add a Livebook because I am not familiar with Kino, nor how would I run some Javascript code in a Livebook.
Beta Was this translation helpful? Give feedback.
All reactions