使用命名管道(仅限 Linux):
当有两个或更多输入流(需要从内存缓冲区通过管道传输)时,需要命名管道。
使用命名管道一点也不简单......
从 FFmpeg 的角度来看,命名管道就像(不可查找的)输入文件。
在 Python 中使用命名管道(在 Linux 中):
Assume pipe1
是“命名管道”的名称(例如pipe1 = "audio_pipe1"
).
-
创建一个“命名管道”:
os.mkfifo(pipe1)
-
将管道打开为“只写”文件:
fd_pipe = os.open(pipe_name, os.O_WRONLY) # fd_pipe1 is a file descriptor (an integer).
-
将数据分成小块写入管道。
根据thispost,大多数Linux系统中管道的默认缓冲区大小是64KBytes。
由于数据大于65536字节,我们需要将数据分小块写入管道。
我决定使用 1024 字节的任意块大小。
管道写操作是一个“阻塞”操作。
我通过使用“作家”解决了这个问题thread:
def writer(data, pipe_name, chunk_size):
# Open the pipes as opening "low level IO" files (open for "open for writing only").
fd_pipe = os.open(pipe_name, os.O_WRONLY) # fd_pipe1 is a file descriptor (an integer)
for i in range(0, len(data), chunk_size):
# Write to named pipe as writing to a "low level IO" file (but write the data in small chunks).
os.write(fd_pipe, data[i:chunk_size+i]) # Write 1024 bytes of data to fd_pipe
-
关闭管道:
os.close(fd_pipe)
-
删除(取消链接)命名管道:
os.unlink(pipe1)
这是来自上一篇文章,使用两个命名管道:
import subprocess
import os
from threading import Thread
def create_samp():
# Read audio stream from https://freesound.org/data/previews/186/186942_2594536-hq.mp3
# Apply adelay audio filter.
# Encode the audio in mp3 format.
# FFmpeg output is passed to stdout pipe, and stored in sample bytes array.
sample1 = subprocess.run(["ffmpeg", "-i", "https://freesound.org/data/previews/186/186942_2594536-hq.mp3",
"-af", "adelay=15000|15000", "-f", "mp3", "pipe:"], stdout=subprocess.PIPE).stdout
# Read second audio sample from https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3
sample2 = subprocess.run(["ffmpeg", "-i", "https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3",
"-f", "mp3", "pipe:"], stdout=subprocess.PIPE).stdout
return sample1, sample2
def writer(data, pipe_name, chunk_size):
# Open the pipes as opening files (open for "open for writing only").
fd_pipe = os.open(pipe_name, os.O_WRONLY) # fd_pipe1 is a file descriptor (an integer)
for i in range(0, len(data), chunk_size):
# Write to named pipe as writing to a file (but write the data in small chunks).
os.write(fd_pipe, data[i:chunk_size+i]) # Write 1024 bytes of data to fd_pipe
# Closing the pipes as closing files.
os.close(fd_pipe)
def record(samp1, samp2):
# Names of the "Named pipes"
pipe1 = "audio_pipe1"
pipe2 = "audio_pipe2"
# Create "named pipes".
os.mkfifo(pipe1)
os.mkfifo(pipe2)
# Open FFmpeg as sub-process
# Use two audio input streams:
# 1. Named pipe: "audio_pipe1"
# 2. Named pipe: "audio_pipe2"
# Merge the two audio streams using amix audio filter.
# Store the result to output file: output.mp3
process = subprocess.Popen(["ffmpeg", "-y", '-f', 'mp3',
"-i", pipe1,
"-i", pipe2,
"-filter_complex", "amix=inputs=2:duration=longest", "output.mp3"],
stdin=subprocess.PIPE)
# Initialize two "writer" threads (each writer writes data to named pipe in chunks of 1024 bytes).
thread1 = Thread(target=writer, args=(samp1, pipe1, 1024)) # thread1 writes samp1 to pipe1
thread2 = Thread(target=writer, args=(samp2, pipe2, 1024)) # thread2 writes samp2 to pipe2
# Start the two threads
thread1.start()
thread2.start()
# Wait for the two writer threads to finish
thread1.join()
thread2.join()
process.wait() # Wait for FFmpeg sub-process to finish
# Remove the "named pipes".
os.unlink(pipe1)
os.unlink(pipe2)
sampl1, sampl2 = create_samp()
record(sampl1, sampl2)
Update:
使用类的相同解决方案:
使用类(“NamedPipeWriter
"class) 更优雅一点。
该类继承Thread类,并重写run
method.
您可以创建多个对象的列表,并在循环中迭代它们(而不是为每个新输入流重复代码)。
这是使用类的相同解决方案:
import subprocess
import os
import stat
from threading import Thread
def create_samp():
# Read audio stream from https://freesound.org/data/previews/186/186942_2594536-hq.mp3
# Apply adelay audio filter.
# Encode the audio in mp3 format.
# FFmpeg output is passed to stdout pipe, and stored in sample bytes array.
sample1 = subprocess.run(["ffmpeg", "-i", "https://freesound.org/data/previews/186/186942_2594536-hq.mp3",
"-af", "adelay=15000|15000", "-f", "mp3", "pipe:"], stdout=subprocess.PIPE).stdout
# Read second audio sample from https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3
sample2 = subprocess.run(["ffmpeg", "-i", "https://cdns-preview-b.dzcdn.net/stream/c-b0b684fe962f93dc43f1f7ea493683a1-3.mp3",
"-f", "mp3", "pipe:"], stdout=subprocess.PIPE).stdout
return sample1, sample2
class NamedPipeWriter(Thread):
""" Write data (in small chunks) to a named pipe using a thread """
def __init__(self, pipe_name, data):
""" Initialization - get pipe name and data to be written """
super().__init__()
self._pipe_name = pipe_name
self._chunk_size = 1024
self._data = data
def run(self):
""" Open the pipe, write data in small chunks and close the pipe """
chunk_size = self._chunk_size
data = self._data
# Open the pipes as opening files (open for "open for writing only").
fd_pipe = os.open(self._pipe_name, os.O_WRONLY) # fd_pipe1 is a file descriptor (an integer)
for i in range(0, len(data), chunk_size):
# Write to named pipe as writing to a file (but write the data in small chunks).
os.write(fd_pipe, data[i:chunk_size+i]) # Write 1024 bytes of data to fd_pipe
# Closing the pipes as closing files.
os.close(fd_pipe)
def record(samp1, samp2):
# Names of the "Named pipes"
pipe1 = "audio_pipe1"
pipe2 = "audio_pipe2"
# Create "named pipes".
if not stat.S_ISFIFO(os.stat(pipe1).st_mode):
os.mkfifo(pipe1) # Create the pipe only if not exist.
if not stat.S_ISFIFO(os.stat(pipe2).st_mode):
os.mkfifo(pipe2)
# Open FFmpeg as sub-process
# Use two audio input streams:
# 1. Named pipe: "audio_pipe1"
# 2. Named pipe: "audio_pipe2"
# Merge the two audio streams using amix audio filter.
# Store the result to output file: output.mp3
process = subprocess.Popen(["ffmpeg", "-y", '-f', 'mp3',
"-i", pipe1,
"-i", pipe2,
"-filter_complex", "amix=inputs=2:duration=longest", "output.mp3"],
stdin=subprocess.PIPE)
# Initialize two "writer" threads (each writer writes data to named pipe in chunks of 1024 bytes).
named_pipe_writer1 = NamedPipeWriter(pipe1, samp1)
named_pipe_writer2 = NamedPipeWriter(pipe2, samp2)
# Start the two threads
named_pipe_writer1.start()
named_pipe_writer2.start()
# Wait for the two writer threads to finish
named_pipe_writer1.join()
named_pipe_writer1.join()
process.wait() # Wait for FFmpeg sub-process to finish
# Remove the "named pipes".
os.unlink(pipe1)
os.unlink(pipe2)
sampl1, sampl2 = create_samp()
record(sampl1, sampl2)
Notes:
- 该代码在 Ubuntu 18.04(虚拟机)中进行了测试。