Source code for autoscrub

# Copyright 2017 Russell Anderson, Philip Starkey
#
# This file is part of autoscrub.
#
# autoscrub is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# autoscrub is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with autoscrub.  If not, see <http://www.gnu.org/licenses/>.

from __future__ import print_function
try:
    from __version__ import __version__
except ImportError:
    # Version file has not been autogenerated from build process:
    __version__ = None

import os
import sys
import re
import six
import time
if six.PY3:
    from subprocess import Popen, call, PIPE, list2cmdline
    if sys.platform.startswith('win'):
        from subprocess import CREATE_NEW_PROCESS_GROUP
else:
    # backported subprocess module
    from subprocess32 import Popen, call, PIPE, list2cmdline
    if sys.platform.startswith('win'):
        from subprocess import CREATE_NEW_PROCESS_GROUP

import math
from functools import reduce
import signal


NUL = os.devnull

class AutoscrubException(Exception):
    pass

    
# setup signal handling to terminate subprocesses when
# this process is terminated
_previous_sigint = signal.getsignal(signal.SIGINT)
_previous_sigterm = signal.getsignal(signal.SIGTERM)
# _previous_sigkill = signal.getsignal(signal.SIGKILL)
_process_list = []
def _kill_autoscrub_processes(signum, frame):
    for p in _process_list:
        try:
            p.terminate()
        except Exception:
            pass

    if signum == signal.SIGINT and _previous_sigint not in [None, signal.SIG_IGN, signal.SIG_DFL]:
        _previous_sigint(signum, frame)
    if signum == signal.SIGTERM and _previous_sigterm not in [None, signal.SIG_IGN, signal.SIG_DFL]:
        _previous_sigterm(signum, frame)
    # if signum == signal.SIGKILL and _previous_sigkill not in [None, signal.SIG_IGN, signal.SIG_DFL]:
        # _previous_sigkill(signum, frame)
        
    # force terminate
    os._exit(1) 
        
signal.signal(signal.SIGINT, _kill_autoscrub_processes)
signal.signal(signal.SIGTERM, _kill_autoscrub_processes)
# signal.signal(signal.SIGKILL, _kill_autoscrub_processes)
    
__terminal_encoding = 'utf-8'
[docs]def set_terminal_encoding(encoding): """ Sets the encoding used for communicating with ffmpeg and ffprobe .. note::Only applies to Python 3. Sets the default value used to decode strings returned from :code:`subprocess.Popen`. This should match your system encoding, and is unlikely to need changing. """ global __terminal_encoding __terminal_encoding = encoding
__suppress_output = False
[docs]def suppress_ffmpeg_output(suppress): """suppresses the output to the terminal from FFmpeg and FFprobe. Output is printed by default unless this function is called with the argument :code:`True`. Call with :code:`False` to enable terminal output again. Arguments: suppress: If :code:`True`, ffmpeg and ffprobe output will be suppressed. """ global __suppress_output __suppress_output = bool(suppress)
def _agnostic_Popen(*args, **kwargs): # sensible defaults for kwargs if 'shell' not in kwargs: kwargs['shell'] = False # don't do this or else user won't be able to respond to ffmpeg prompts!! # if 'stdin' not in kwargs: # kwargs['stdin'] = PIPE if 'stderr' not in kwargs: kwargs['stderr'] = PIPE if 'stdout' not in kwargs: kwargs['stdout'] = PIPE # Launch the subprocess as a new subprocess group in order to # stop the subprocess from capturing the stdin if sys.platform.startswith('win'): if 'creationflags' not in kwargs: kwargs['creationflags'] = CREATE_NEW_PROCESS_GROUP else: if 'start_new_session' not in kwargs: kwargs['start_new_session'] = True p = Popen(*args, **kwargs) # add the process to a list incase we get a SIGTERM or SIGINT _process_list.append(p) # get the command passed to Popen if len(args) > 0: command = args[0] else: command = kwargs['args'] # store the command for use in exception handling later p.autoscrub_command = command return p def _agnostic_communicate(p, write_to_terminal = None, new_line_callback=None): """We only read from stderr because ffmpeg only prints to stderr Things get much more complicated if we need to read from both! """ # use module wide setting if not explicitly defined if write_to_terminal is None: write_to_terminal = not __suppress_output stderr = '' local_buffer = '' while True: out = p.stderr.read(100) # decode if it's a bytes string due to Python 3 if six.PY3: out = out.decode(__terminal_encoding) stderr += out # fancy code for nicely printing to the terminal # find last occurance of \r or \n pos1 = out.rfind("\r") pos2 = out.rfind("\n") pos = max(pos1, pos2) # if no new line character was printed, then add to the buffer if pos == -1: local_buffer += out # print everything in the buffer and the text before the character # save everything after the last \r|\n character in the local_buffer until we have the rest of the line else: local_buffer += out[:pos+1] # print to terminal if requested if write_to_terminal: sys.stderr.write(local_buffer) sys.stderr.flush() # send to callback is present if new_line_callback: new_line_callback(local_buffer) local_buffer = out[pos+1:] # if we hit end of file and the process has a return code, then break if len(out) < 100 and p.poll() != None: p.poll() # print anything left over in the local buffer if write_to_terminal: sys.stderr.write(local_buffer) sys.stderr.flush() break # we don't need to keep hold of the process anymore (for passing along SIGTERM and SIGINT) # since the process is done _process_list.remove(p) # if autoscrub did not return correctly if p.returncode != 0: # format the command if isinstance(p.autoscrub_command, six.string_types): command = p.autoscrub_command else: command = list2cmdline(p.autoscrub_command) # raise Exception raise AutoscrubException('[autoscrub:error] The command "{}" failed to execute and exited with return code {}'.format(command, p.returncode)) return '', stderr
[docs]def hhmmssd_to_seconds(s): """Convert a :code:`'[hh:]mm:ss[.d]'` string to seconds. The reverse of :func:`autoscrub.seconds_to_hhmmssd` Arguments: s: A string in the format :code:`'[hh:]mm:ss[.d]'`. The hours and decimal seconds are optional. Returns: The number of seconds as a float. """ assert isinstance(s, six.string_types) return reduce(lambda t60, x: t60 * 60 + x, map(float, s.split(':')))
[docs]def seconds_to_hhmmssd(t, decimal=True): """Convert a float (in seconds) to a :code:`'[hh:]mm:ss[.d]'` formatted string. The reverse of :func:`autoscrub.hhmmssd_to_seconds` Arguments: t: The number of seconds as a float Keyword Arguments: decimal: Whether to include the decimal component (default: True) Returns: The number of seconds as a :code:`'[hh:]mm:ss[.d]'` formatted string. """ t = float(t) # handle negative time s = '' if t < 0: s = '-' t *= -1 hours, remainder = divmod(t, 3600) minutes, seconds = divmod(remainder, 60) s += '{:02d}:{:02d}'.format(int(hours), int(minutes)) if not decimal: s+=':{:02d}'.format(int(seconds)) else: s+=':{:06.3f}'.format(seconds) return s
class _NewLineCallback(object): def __init__(self, duration=None, update_every_n_seconds=3, prefix=""): self.time_since_last_print = time.time() self.update_every_n_seconds = update_every_n_seconds self.start_time = time.time() self.duration = duration self.last_percentage = 0 self.prefix = (prefix + " ") if prefix else prefix def new_line_callback(self, line): try: if self.duration is None and "Duration:" in line: self.duration = hhmmssd_to_seconds(line.split("Duration: ")[-1].split(",")[0]) except Exception: pass # Only update every N seconds if time.time() - self.time_since_last_print < self.update_every_n_seconds: return if self.duration is None and "silence_start:" in line: try: silence_text = line.split("silence_start: ")[-1] sys.stdout.write("Found a new silent segment starting at %s"%silence_text) except Exception: pass else: self.time_since_last_print = time.time() elif "time=" in line: try: # get time text time_text = line.split('time=')[-1].split(' ')[0] #format it into seconds seconds = hhmmssd_to_seconds(time_text) percentage = min(float(seconds)/self.duration, 1)*100 time_remaining = (time.time()-self.start_time)/percentage*(100-percentage) if self.last_percentage != int(percentage): print("{}{:3d}% complete [{} remaining]".format(self.prefix, int(percentage), seconds_to_hhmmssd(time_remaining, decimal=False)), end="\r") self.last_percentage = int(percentage) except Exception: print("[autoscrub:warning] Could not determine percentage completion. Consider not suppressing the FFmpeg output.") else: self.time_since_last_print = time.time()
[docs]def ffprobe(filename): """Runs ffprobe on :code:`filename` and returns the log output from stderr. Arguments: filename: The filepath passed to ffprobe. Returns: The output of the ffprobe command. """ command = ['ffprobe', '-i', "%s" % filename] p = _agnostic_Popen(command, stdout=PIPE, stderr=PIPE) stdout, stderr = _agnostic_communicate(p) return stderr
[docs]def ffmpeg(filename, args=[], output_path=None, output_type=None, overwrite=None): """Runs ffmpeg on filename with the specified args. Arguments: filename: The filepath passed to the :code:`-i` option of ffmpeg. Keyword Arguments: args: A list of additional arguments to pass to ffmpeg. output_path: The filepath to append to the end of the ffmpeg command, designating the output file for the ffmpeg result. If left as the default (:code:`None`) it appends :code:`_processed` to the end of the filename and preserves input file extension unless :code:`output_type` is specified. output_type: Determines the output file type. Specify as a string containing the required file extension. This is ignored if :code:`output_path` is specified. overwrite: If :code:`True`, overwrites the :code:`output_path` with no prompt. If :code:`False`, the function will fail if the :code:`output_path` exists. Defaults to :code:`None` (prompts user for input). You must specify a value if you have suppressed terminal output with :func:`autoscrub.suppress_ffmpeg_output` Returns: The :code:`output_path` where the output of ffmpeg was written. """ command = ['ffmpeg', '-i', '%s' % filename] command += args if output_path is None: filename_prefix, file_extension = os.path.splitext(filename) if output_type is not None: file_extension = output_type output_path = filename_prefix + '_processed' + file_extension if __suppress_output and overwrite is None: raise RuntimeError("[autoscrub:error] If ffmpeg output is suppressed, you must specify the overwrite keyword argument or else ffmpeg will hang on user input.") if overwrite is not None: command += ['-y'] if overwrite==True else ['-n'] command += ['%s' % output_path] print(list2cmdline(command)) p = _agnostic_Popen(command) stdout, stderr = _agnostic_communicate(p) return output_path
[docs]def findDuration(log_output): """Finds the duration in seconds from ffprobe log_output. Arguments: log_output: The output of ffprobe, as returned by :func:`autoscrub.ffprobe`. Returns: A float containing duration in seconds or None if the duration could not be determined. """ matches = re.findall('Duration: +([\d\:\.]+)', log_output) if matches: duration = matches[0] seconds = hhmmssd_to_seconds(duration) return seconds else: return None
[docs]def getDuration(filename): """Runs ffprobe on filename and extracts duration in seconds. Arguments: filename: The filepath of the media file you wish to process. Returns: A float containing duration in seconds or None if the duration could not be determined. """ ffprobe_log = ffprobe(filename) return findDuration(ffprobe_log)
[docs]def findSampleRate(log_output): """Finds the audio sample rate in Hz from ffprobe log_output. Arguments: log_output: The output of ffprobe, as returned by :func:`autoscrub.ffprobe`. Returns: A float containing audio sample rate in Hz or None if the sample rate could not be determined. """ matches = re.findall(', ([\d]+) Hz', log_output) if matches: return int(matches[-1]) else: return None
[docs]def getSampleRate(filename): """Runs ffprobe on filename and extracts audio sample rate in Hz. Arguments: filename: The filepath of the media file you wish to process. Returns: A float containing audio sample rate in Hz or None if the sample rate could not be determined. """ ffprobe_log = ffprobe(filename) return findSampleRate(ffprobe_log)
[docs]def findSilences(log_output): """Extract silences from ffmpeg log_output when using the silencedetect filter. Arguments: log_output: The output of the ffmpeg silencedetect filter, as returned by :func:`autoscrub.getSilences`. Returns: a list of silence dictionaries, with keys:: silence_start: the timestamp of the detected silent interval in seconds silence_end: the timestamp of the detected silent interval in seconds silence_duration: duration of the silent interval in seconds """ matches = re.findall(r"(silence_[a-z]+): ([\-\d\.]+)", log_output) matches = [(k, float(v)) for (k, v) in matches] if matches: return [dict(matches[i:i + 3]) for i in six.moves.xrange(0, len(matches), 3)] else: return []
[docs]def getSilences(filename, input_threshold_dB=-18.0, silence_duration=2.0, save_silences=True): """Runs the ffmpeg filter silencedetect with the specified settings. Arguments: filename: the path to the video file to examine Keyword Arguments: input_threshold: instantaneous level (in dB) to detect silences with (default -18). silence_duration: seconds for which level mustn't exceed threshold to declare silence (default 2). save_silences: print the above timestamps to CSV file (default = True). Returns: a list of silence dictionaries, with keys:: silence_start: the timestamp of the detected silent interval in seconds silence_end: the timestamp of the detected silent interval in seconds silence_duration: duration of the silent interval in seconds """ command = ['ffmpeg', '-i', '%s'%filename, '-af', 'silencedetect=n=%.1fdB:d=%s'%(input_threshold_dB,silence_duration), '-f', 'null', '%s'%NUL] p = _agnostic_Popen(command, stdout=PIPE, stderr=PIPE) # Print a percentage complete message to the terminal if output is suppressed if __suppress_output: nlc = _NewLineCallback(update_every_n_seconds=2, prefix="[ffmpeg:silencedetect]") callback = nlc.new_line_callback else: callback = None stdout, stderr = _agnostic_communicate(p, new_line_callback=callback) start_time = callback.__self__.start_time if six.PY3 else callback.im_self.start_time seconds_taken = time.time() - start_time time_taken = seconds_to_hhmmssd(seconds_taken, decimal=False) print("[ffmpeg:silencedetect] Completed in {} ".format(time_taken)) silences = findSilences(stderr) if save_silences: filename_prefix, file_extension = os.path.splitext(filename) silence_path = '%s_silences.csv' % filename_prefix with open(silence_path, 'w') as f: for silence in silences: ti = silence['silence_start'] tf = silence['silence_end'] if 'silence_end' in silence else '' dt = silence['silence_duration'] if 'silence_duration' in silence else '' f.write('%s,%s,%s\n' % (ti, tf, dt)) return silences
[docs]def findLoudness(log_output): """Extract loudness (key, value) pairs from ffmpeg log_output when using the ebur128 filter. Arguments: log_output: The output of the ffmpeg ebur128 filter, as returned by :func:`autoscrub.getLoudness`. Returns: A loudness dictionary with keys:: I: integrated loudness in dBLUFS LRA: loudness range in dBLUFS LRA high: LRA low: Threshold: """ log_split = re.split(r"Parsed_ebur128.+", log_output) if len(log_split) > 1: summary = log_split[-1] matches = re.findall(r"([A-Z][A-Za-z ]*): +([\-\d\.]+)", summary) if matches: return dict([(k, float(v)) for (k, v) in matches]) return None
[docs]def getLoudness(filename): """Runs the ffmpeg ebur128 filter on filename. Arguments: filename: the path to the video file to examine. Returns: A loudness dictionary with keys:: I: integrated loudness in dBLUFS LRA: loudness range in dBLUFS LRA high: LRA low: Threshold: """ command = ['ffmpeg', '-i', '%s'%filename, '-c:v', 'copy', '-af', 'ebur128', '-f', 'null', '%s'%NUL] p = _agnostic_Popen(command, stdout=PIPE, stderr=PIPE) stdout, stderr = _agnostic_communicate(p) return findLoudness(stderr)
[docs]def matchLoudness(filename, target_lufs=-18, output_path=None, overwrite=None): """ Applies the volume ffmpeg filter in an attempt to change the audio volume to match the specified target. Arguments: filename: the path to the video file to examine. Keyword Arguments: target_lufs: The target LUFS for the output audio (default: -18) output_path: the filepath at which to write the resultant file. If no output path is specified, it follows the conventions of :func:`autoscrub.ffmpeg`. overwrite: If :code:`True`, overwrites the :code:`output_path` with no prompt. If :code:`False`, the function will fail if the :code:`output_path` exists. Defaults to :code:`None` (prompts user for input). You must specify a value if you have suppressed terminal output with :func:`autoscrub.suppress_ffmpeg_output` Returns: The :code:`output_path` where the output of ffmpeg was written. """ input_loudness = getLoudness(filename) input_lufs = input_loudness['I'] gain = target_lufs - input_lufs print('[autoscrub:info] Input loudness = %.1f dBLUFS; Gain to apply = %.1f dB' % (input_lufs, gain)) output_path = ffmpeg(filename, ['-c:v', 'copy', '-af', 'volume=%.1fdB' % gain], output_path, overwrite=overwrite) output_loudness = getLoudness(output_path) output_lufs = output_loudness['I'] print('[autoscrub:info] Output loudness = %.1f dBLUFS; Error = %.1f dB' % (output_lufs, target_lufs-output_lufs)) return output_path
[docs]def trim(input_path, tstart=0, tstop=None, output_path=None, overwrite=None, codec='copy', output_type=None): """Extract contents of input_path between tstart and tstop. Arguments: input_path: The path to the media file to process Keyword Arguments: tstart: A integer/float in seconds, or a '[hh:]mm:ss[.d]' string (default 0) tstop: A integer/float in seconds, or a '[hh:]mm:ss[.d]' string (default None) output_path: Defaults to appending '_trimmed' to input_path overwrite: If :code:`True`, overwrites the :code:`output_path` with no prompt. If :code:`False`, the function will fail if the :code:`output_path` exists. Defaults to :code:`None` (prompts user for input). You must specify a value if you have suppressed terminal output with :func:`autoscrub.suppress_ffmpeg_output` codec: Specify the codec to use in the encoding of the output file (default: copy). output_type: Determines the output file type. Specify as a string containing the required file extension. This is ignored if :code:`output_path` is specified. Returns: The :code:`output_path` where the output of ffmpeg was written. """ folder, filename = os.path.split(input_path) if not isinstance(tstart, six.string_types): tstart = '%.4f' % float(tstart) if tstop and not isinstance(tstop, six.string_types): tstop = '%.4f' % float(tstop) command = ['ffmpeg', '-i', '%s'%input_path] if hhmmssd_to_seconds(tstart) > 0: command += ['-ss', tstart] if tstop is not None: command += ['-to', tstop] if codec == 'copy': command += ['-c', 'copy'] else: command += codec if __suppress_output and overwrite is None: raise RuntimeError("[autoscrub:error] If ffmpeg output is suppressed, you must specify the overwrite keyword argument or else ffmpeg will hang on user input.") if overwrite is not None: command.append('-y' if overwrite==True else '-n') if output_path is None: filename_prefix, file_extension = os.path.splitext(input_path) if output_type is not None: file_extension = output_type output_path = filename_prefix + '_trimmed' + file_extension command.append(output_path) try: p = _agnostic_Popen(command) stdout, stderr = _agnostic_communicate(p) return output_path except Exception as e: print(e) return None
[docs]def trimSegments(input_path, trimpts, output_path=None, output_type=None, **kwargs): """Extract segments of a file using a list of :code:`(tstart, tstop)` tuples. Each segment is saved as a file of the same type as the original. Arguments: input_path: The path to the media file to process Keyword Arguments: trimpts: A list of :code:`(tstart, tstop)` tuples. See :func:`trim` for the supported formats of :code:`tstart` and :code:`tstop`. output_path: The folder in which to save the segments. Defaults to the folder 'temp' in the current working directory. output_type: Determines the output file type. Specify as a string containing the required file extension. kwargs: A list of additional keyword arguments to pass to :func:`trim`. Note that :code:`tstart`, :code:`tstop` and :code:`output_path` cannot be specified as additional keyword arguments as they are already specified explicitly when :code:`trimSegments` calls :code:`trim`. Returns: A list of paths to each segment created. """ folder, filename = os.path.split(input_path) filename_prefix, file_extension = os.path.splitext(filename) if output_type is not None: file_extension = output_type temp_folder = output_path if output_path else os.path.join(folder, 'temp') if not os.path.exists(temp_folder): os.mkdir(temp_folder) segment_paths = [] for i, (tstart, tstop) in enumerate(trimpts): segment_file = filename_prefix + '_%03i' % i + file_extension segment_path = os.path.join(temp_folder, segment_file) trim(input_path, tstart, tstop, segment_path, **kwargs) print('Trimmed segment %03i of %s (from %s to %s).' % (i, filename, tstart, tstop)) segment_paths.append(segment_path) return segment_paths
[docs]def concatFileList(concat_path, output_path, overwrite=None): """Take a file list for the ffmpeg concat demuxer and save to :code:`output_path`. The concat file (located at :code:`concat_path`) must contain lines of the form:: file '/path/to/file1' file '/path/to/file2' file '/path/to/file3' This avoids a re-encode and can be used with formats that do not support file level concatenation. Arguments: concat_path: the filepath containing the list of media files to concatenate output_path: the filepath at which to write the result of the concatenation Keyword Arguments: overwrite: If :code:`True`, overwrites the :code:`output_path` with no prompt. If :code:`False`, the function will fail if the :code:`output_path` exists. Defaults to :code:`None` (prompts user for input). You must specify a value if you have suppressed terminal output with :func:`autoscrub.suppress_ffmpeg_output` Returns: :code:`output_path` if successful or :code:`None`. """ command = ['ffmpeg', '-safe', '0', '-f', 'concat', '-i', '%s'%concat_path, '-c', 'copy'] if __suppress_output and overwrite is None: raise RuntimeError("[autoscrub:error] If ffmpeg output is suppressed, you must specify the overwrite keyword argument or else ffmpeg will hang on user input.") if overwrite is not None: command += ['-y'] if overwrite==True else ['-n'] command += ['%s' % output_path] print('[autoscrub] Running ffmpeg command:') print(list2cmdline(command)) try: p = _agnostic_Popen(command) stdout, stderr = _agnostic_communicate(p) return output_path except Exception as e: print(e) return None
[docs]def concatSegments(segment_paths, output_path=None, overwrite=None): """Concatenate a list of inputs (:code:`segment_paths`) using the ffmpeg concat demuxer. A concat file will be created of the form:: file '/path/to/file1' file '/path/to/file2' file '/path/to/file3' This avoids a re-encode and can be used with formats that do not support file level concatenation. Arguments: segment_paths: A list of filepaths to concatenate Keyword Arguments: output_path: the filepath at which to write the concat file. If left as the default (:code:`None`) it appends :code:`_concat` to the end of the filename and preserves the input file extension. overwrite: If :code:`True`, overwrites the :code:`output_path` with no prompt. If :code:`False`, the function will fail if the :code:`output_path` exists. Defaults to :code:`None` (prompts user for input). You must specify a value if you have suppressed terminal output with :func:`autoscrub.suppress_ffmpeg_output` Returns: The :code:`output_path` where the output of ffmpeg was written. """ folder, first_path = os.path.split(segment_paths[0]) first_prefix, file_extension = os.path.splitext(first_path) filename_prefix = '_'.join(first_prefix.split('_')[:-1]) concat_file = ''.join(filename_prefix) + '_concat.txt' concat_path = os.path.join(folder, concat_file) if not os.path.exists(concat_path) or overwrite: with open(concat_path, 'w') as f: f.write('\n'.join(["file '%s'" % path for path in segment_paths])) if not output_path: output_path = os.path.join(folder, filename_prefix + '_concat' + file_extension) return concatFileList(concat_path, output_path, overwrite)
[docs]def silenceFilterGraph(silences, factor, delay=0.25, audio_rate=44100, hasten_audio=None, silent_volume=1.0, v_in='[0:v]', a_in='[0:a]', v_out='[v]', a_out='[a]'): """Generate a filtergraph string (for processing with the -filter_complex flag of ffmpeg) using the trim and atrim filters to speed up periods in the video designated by a list of silence dictionaries, where each silence dictionary contains keys:: silence_start: the timestamp of the detected silent interval in seconds silence_end: the timestamp of the detected silent interval in seconds silence_duration: duration of the silent interval in seconds Arguments: silences: A list of silence dictionaries generated from getSilences factor: to speed up video during (a subset of) each silent interval Keyword arguments: delay: to omit from silent intervals when changing speed (default 0.25s) audio_rate: Sample rate of audio input (in Hz, default 44100) used in asetrate/aresample filters when hasten_audio=True hasten_audio: None, 'pitch' or 'tempo'. Speed up audio during silent segment by either increasing pitch (with asetrate and aresample filters) or tempo (with atempo filter). silent_volume: scale the volume during silent segments (default 1.0; no scaling) v_in: The named filtergraph video input pad. Defaults to :code:`[0:v]` (see the `FFmpeg filter documentation`_). a_in: The named filtergraph audio input pad. Defaults to :code:`[0:a]` (see the `FFmpeg filter documentation`_). v_out: The named filtergraph video output pad. Defaults to :code:`[v]` (see the `FFmpeg filter documentation`_). a_out: The named filtergraph audio output pad. Defaults to :code:`[a]` (see the `FFmpeg filter documentation`_). Returns: The generated filtergraph as a string .. _`FFmpeg filter documentation`: http://ffmpeg.org/ffmpeg-filters.html#Filtergraph-syntax-1 """ # Omit silences at the start/end of the file if len(silences) > 0: if 'silence_end' not in silences[-1]: silences = silences[:-1] if len(silences) > 0: if silences[0]['silence_start'] <= 0.: silences = silences[1:] # Timestamp of end of most recently processed segment tf_last = 0 # Container for calls to trim (video) and atrim (audio) filters vstrings = [] astrings = [] # String to call concat filter with concat_string = '' # Number of silences to process n = len(silences) # Number of segments (only because any beginning/end silences are gone) n_segs = 2*n + 1 # Generate 4 x filtergraph lines for each silence for i, s in enumerate(silences): # Number segments in filtergraph from 1 to n_segs i += 1 # Cast end of last segment to string t0 = '%.4f' % tf_last # Begin trim (& speedup) delay seconds after silence_start ti = '%.4f' % (s['silence_start'] + delay) # End trim (& speedup) delay seconds before silence_end tf = '%.4f' % (s['silence_end'] - delay) # Predicted duration of sped up segment based on above and factor ta = '%.4f' % (s['silence_start'] + delay + (s['silence_duration'] - 2*delay)/factor) # Trim video before this silence (regular speed) vstrings.append('%strim=%s:%s,setpts=PTS-STARTPTS[v%i];' % (v_in, t0, ti, (2*i-1))) # Trim video during this silence and speed up using setpts vstrings.append('%strim=%s:%s,setpts=(PTS-STARTPTS)/%i[v%i];' % (v_in, ti, tf, factor, (2*i))) # Trim video before this silence (regular speed) astrings.append('%satrim=%s:%s,asetpts=PTS-STARTPTS[a%i];' % (a_in, t0, ti, (2*i-1))) if hasten_audio == 'pitch': # Speed up audio during silent segment with asetrate and aresample filters (increases pitch) astrings.append('%satrim=%s:%s,asetpts=PTS-STARTPTS,asetrate=%i,aresample=%i,volume=%.3f[a%i];' % (a_in, ti, tf, (factor*audio_rate), audio_rate, silent_volume, (2*i))) elif hasten_audio == 'tempo': # speed up audio during silent segment with atempo (increases tempo) q = math.log(factor, 2) tempos = ['atempo=2.0']*int(q) if q != int(q): tempos.append('atempo=%.3f/%d'%(factor, 2**int(q))) tempo_str = ','.join(tempos) astrings.append('%satrim=%s:%s,asetpts=PTS-STARTPTS,%s,volume=%.3f[a%i];' % (a_in, ti, tf, tempo_str, silent_volume, (2*i))) else: # Use first 1/factor samples of silence for audio (no pitch increase) astrings.append('%satrim=%s:%s,asetpts=PTS-STARTPTS,volume=%.3f[a%i];' % (a_in, ti, ta, silent_volume, (2*i))) # Append these streams to the concat filter input concat_string += '[v%i][a%i][v%i][a%i]' % ((2*i-1), (2*i-1), (2*i), (2*i)) tf_last = s['silence_end'] - delay # Trim the final segment (regular speed) without specifying the end time vstrings.append('%strim=start=%.4f,setpts=PTS-STARTPTS[v%i];' % (v_in, tf_last, n_segs)) astrings.append('%satrim=start=%.4f,asetpts=PTS-STARTPTS[a%i];' % (a_in, tf_last, n_segs)) # Finish the concat filter call concat_string += '[v%i][a%i]concat=n=%i:v=1:a=1%s%s;' % (n_segs, n_segs, n_segs, v_out, a_out) # Collect lines of the filter script after the trim/atrim calls return '\n'.join(vstrings + astrings + [concat_string])
[docs]def resizeFilterGraph(v_in='[0:v]', width=1920, height=1080, pad=True, mode='decrease', v_out='[v]'): """Generate a filtergraph string (for processing with the -filter_complex flag of ffmpeg) using the scale and pad filters to scale & pad the video for width x height display, with optional padding. Keyword arguments: v_in: The named filtergraph video input pad. Defaults to :code:`[0:v]` (see `FFmpeg filter documentation`_). width: of display on which the output stream must fit (default 1920). height: of display on which the output stream must fit (default 1080). pad: add letter- or pillar-boxes to the output as required to fill width x height. mode: argument of ffmpeg scale filter (default 'decrease'). v_out: The named filtergraph video output pad. Defaults to :code:`[v]` (see `FFmpeg filter documentation`_). Returns: The generated filtergraph as a string. .. _`FFmpeg filter documentation`: http://ffmpeg.org/ffmpeg-filters.html#Filtergraph-syntax-1 """ vstrings = [] v_scaled = '[scaled]' if pad else v_out vstrings.append('%sscale=w=%i:h=%i:force_original_aspect_ratio=%s%s;' % (v_in, width, height, mode, v_scaled)) if pad: vstrings.append('%spad=%s:%s:(ow-iw)/2:(oh-ih)/2%s;' % (v_scaled, width, height, v_out)) return '\n'.join(vstrings)
[docs]def panGainAudioGraph(a_in='[0:a]', duplicate_ch='left', gain=0, a_out='[a]'): """Generate a filtergraph string (for processing with the -filter_complex flag of ffmpeg) using the pan and volume filters to duplicate audio from one stereo channel to another, and optionally change the volume by gain. Keyword arguments: a_in: The named filtergraph audio input pad. Defaults to :code:`[0:a]` (see `FFmpeg filter documentation`_). duplicate_ch: 'left', 'right', or None/False specify whether to duplicate a stereo channel of input audio stream (default 'left'). gain: to apply (in dB) to the audio stream using the volume filter. a_out: The named filtergraph audio output pad. Defaults to :code:`[a]` (see `FFmpeg filter documentation`_). Returns: The generated filtergraph as a string. .. _`FFmpeg filter documentation`: http://ffmpeg.org/ffmpeg-filters.html#Filtergraph-syntax-1 """ head = a_in tail = a_out + ';' astrings = [] if isinstance(duplicate_ch, six.string_types): if duplicate_ch.lower() == 'left': # Duplicate left channel of input on right channel astrings.append('pan=stereo|c0=c0|c1=c0') if duplicate_ch.lower() == 'right': # Duplicate right channel of input on left channel astrings.append('pan=stereo|c0=c1|c1=c1') if gain: astrings.append('volume=%.1fdB' % gain) if len(astrings): return head + ','.join(astrings) + tail else: return None
[docs]def generateFilterGraph(silences, factor, delay=0.25, rescale=True, pan_audio='left', gain=0, audio_rate=44100, hasten_audio=None, silent_volume=1.0): """Generate a filtergraph string (for processing with the -filter_complex flag of ffmpeg) using the trim and atrim filters to speed up periods in the video designated by a list of silence dictionaries. This function calls :func:`autoscrub.silenceFilterGraph`, :func:`autoscrub.resizeFilterGraph` and :func:`panGainAudioGraph` as appropriate. Arguments: silences: A list of silence dictionaries generated from :func:`autoscrub.getSilences` factor: to speed up video during (a subset of) each silent interval Keyword arguments: delay: to omit from silent intervals when changing speed (default 0.25s) rescale: Scale and pad the video (pillar- or letter-box as required) for 1920 x 1080 display (default True) pan_audio: 'left', 'right', or None/False specify whether to duplicate a stereo channel of input audio stream (default 'left') gain: in dB to apply when pan_audio is 'left' or 'right' audio_rate: Sample rate of audio input (in Hz, default 44100) used in asetrate/aresample filters when :code:`hasten_audio=True`. hasten_audio: None, 'pitch' or 'tempo'. Speed up audio during silent segment by either increasing pitch (with asetrate and aresample filters) or tempo (with atempo filter). silent_volume: scale the volume during silent segments (default 1.0; no scaling). Returns: The generated filtergraph as a string. """ filter_graph = silenceFilterGraph(silences, factor, audio_rate=audio_rate, hasten_audio=hasten_audio, silent_volume=silent_volume, delay=delay, v_out='[vn]' if rescale else '[v]', a_out='[an]' if gain or pan_audio else '[a]') if rescale is True: filter_graph += '\n' + resizeFilterGraph(v_in='[vn]') elif isinstance(rescale, list) or isinstance(rescale, tuple) and len(rescale) == 2: filter_graph += '\n' + resizeFilterGraph(v_in='[vn]', width=rescale[0], height=rescale[1]) elif isinstance(rescale, dict): assert 'width' in rescale and 'height' in rescale filter_graph += '\n' + resizeFilterGraph(v_in='[vn]', width=rescale['width'], height=rescale['height']) if pan_audio or gain: filter_graph += '\n' + panGainAudioGraph(a_in='[an]', duplicate_ch=pan_audio, gain=gain) if filter_graph.endswith(';'): filter_graph = filter_graph[:-1] return filter_graph
[docs]def writeFilterGraph(filter_script_path, silences, factor, **kwargs): """Generates a filtergraph string (using :func:`autoscrub.generateFilterGraph`) and writes it to a file. .. note::Overwrites the file if it already exists without prompting. Arguments: filter_script_path: Path to save the filter script . silences: A list of silence dictionaries generated from :func:`autoscrub.getSilences`. factor: to speed up video during (a subset of) each silent interval. Keyword Arguments: kwargs: Accepts keyword arguments of :func:`autoscrub.generateFilterGraph`. Returns: :code:`None` """ filter_graph = generateFilterGraph(silences, factor, **kwargs) with open(filter_script_path, 'w') as f: f.write(filter_graph)
[docs]def ffmpegComplexFilter(input_path, filter_script_path, output_path=NUL, run_command=True, overwrite=None, stderr_callback=None): """Executes the ffmpeg command and processes a complex filter Prepare and execute (if run_command) ffmpeg command for processing input_path with an ffmpeg filter_complex string (filtergraph) in filter_script_path, and save to output_path. As this requires re-encoding, video and audio settings are chosen to be compliant with YouTube's 'streamable content' specifications, available at (as of April 2017) https://support.google.com/youtube/answer/1722171 Arguments: input_path: The path to the video file to process. filter_script_path: The path to the filter script. Keyword Arguments: output_path: The path to save the processed video (defaults to os.devnull). run_command: If False, simply prepare and return the command for debugging or later use (default: True). overwrite: If :code:`True`, overwrites the :code:`output_path` with no prompt. If :code:`False`, the function will fail if the :code:`output_path` exists. Defaults to :code:`None` (prompts user for input). You must specify a value if you have suppressed terminal output with :func:`autoscrub.suppress_ffmpeg_output` stderr_callback: A reference to a python function to be called when a new line is printed to stderr by ffmpeg. Useful for monitoring the progress of ffmpeg in realtime. Defaults to None. Returns: the FFmpeg command sequence as a list (to be passed to :code:`subprocess.Popen` or formatted into a string for printing). """ header = ['ffmpeg', '-i', '%s'% input_path] youtube_video = ['-c:v', 'libx264', '-crf', '20', '-bf', '2', '-flags', '+cgop', '-g', '15', '-pix_fmt', 'yuv420p', '-movflags', '+faststart'] # -tune stillimage youtube_audio = ['-c:a', 'aac', '-r:a', '48000', '-b:a', '192k'] youtube_other = ['-strict', '-2'] filter_command = ['-filter_complex_script', '%s'%filter_script_path, '-map', '[v]', '-map', '[a]'] tail = ["%s" % output_path] if __suppress_output and overwrite is None: raise RuntimeError("[autoscrub:error] If ffmpeg output is suppressed, you must specify the overwrite keyword argument or else ffmpeg will hang on user input.") if overwrite is not None: if overwrite: tail.insert(0, '-y') else: tail.insert(0, '-n') command_list = header + youtube_video + youtube_audio + youtube_other + filter_command + tail if run_command: # print('Running ffmpeg command:') # print(list2cmdline(command_list)) p = _agnostic_Popen(command_list) stdout, stderr = _agnostic_communicate(p, new_line_callback=stderr_callback) # return output_path # else: # return list2cmdline(command_list) return command_list
if __name__ == '__main__': # Loudness normalisation target_lufs = -18.0 # Silence detection target_threshold_dB = -18.0 # should be close or equal to above silence_duration = 2.0 # should be greater than or equal to 2 (seconds) # Filepaths # input_path = 'lecture.mp4' # input_path = "C:\\Users\\russ\\Documents\\Teaching\\PHS3051\\LectureRecordings\\2017\\Lecture2\\ModernOpticsLecture2.trec" # input_path = "C:\\Users\\russ\\Documents\\Teaching\\PHS3051\\LectureRecordings\\2017\\Lecture3\\ModernOpticsLecture3.trec" # input_path = "C:\\Users\\rander\\Documents\\Teaching\\PHS3051Optics\\LectureRecordings\\2017\\Lecture4\\ModernOpticsLecture4.trec" # input_path = "C:\\Users\\rander\\Documents\\Teaching\\PHS3051Optics\\LectureRecordings\\2017\\Lecture5\\ModernOpticsLecture5.trec" input_path = "C:\\Users\\rander\\Documents\\Teaching\\PHS3051Optics\\LectureRecordings\\2017\\Lecture6\\ModernOpticsLecture6.trec" suffix = 'scrub' # Flags overwrite = True run_command = True rescale = True # pan_audio = False pan_audio = 'left' factor = 8 hasten_audio = 'tempo' # Implementation folder, filename = os.path.split(input_path) filename_prefix, file_extension = os.path.splitext(filename) output_path = '%s_%s.mp4' % (filename_prefix, suffix) filter_script_path = '%s.filter-script' % filename_prefix if folder is not '': os.chdir(folder) if not os.path.exists(filter_script_path) or overwrite: print('[autoscrub] Processing %s' % filename) print('[ffprobe] Getting audio sample rate...') input_sample_rate = getSampleRate(filename) # print("Measured sample rate = %d Hz"%input_sample_rate) print('[ffmpeg:ebur128] Checking loudness of file...') loudness = getLoudness(filename) input_lufs = loudness['I'] gain = target_lufs - input_lufs # Apply gain correction if pan_audio is used (when one stereo channel is silent) if pan_audio: gain -= 3 input_threshold_dB = input_lufs + target_threshold_dB - target_lufs print('[autoscrub:info] Measured loudness = %.1f dBLUFS; Silence threshold = %.1f dB; Gain to apply = %.1f dB' % (input_lufs, input_threshold_dB, gain)) print('[ffmpeg:silencedetect] Searching for silence...') silences = getSilences(filename, input_threshold_dB, silence_duration) durations = [s['silence_duration'] for s in silences if 'silence_duration' in s] mean_duration = sum(durations)/len(durations) print('[autoscrub:info] Deteceted %i silences of average duration %.1f seconds.' % (len(silences), mean_duration)) print('[autoscrub] Generating ffmpeg filter_complex script...') writeFilterGraph(filter_script_path, silences, factor=factor, audio_rate=input_sample_rate, pan_audio=pan_audio, gain=gain, rescale=rescale, hasten_audio=hasten_audio) else: print('[autoscrub:info] Using existing filter_complex script....') print('[autoscrub:info] Required ffmpeg command:') result = ffmpegComplexFilter(input_path, filter_script_path, output_path, run_command, overwrite)