236 lines
9.2 KiB
Python
236 lines
9.2 KiB
Python
import subprocess
|
|
import json
|
|
import os
|
|
import glob
|
|
import sys
|
|
import re
|
|
|
|
def probe_streams(video_path):
|
|
"""Probes the input file and returns info about its streams."""
|
|
cmd = [
|
|
'ffprobe', '-v', 'error', '-show_entries',
|
|
'stream=index,codec_type,codec_name',
|
|
'-of', 'json', video_path
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
try:
|
|
data = json.loads(result.stdout)
|
|
return data.get('streams', [])
|
|
except (json.JSONDecodeError, ValueError):
|
|
return []
|
|
|
|
def get_video_duration(video_path):
|
|
"""Uses ffprobe to extract the exact duration of the baseline video."""
|
|
print(f"Analyzing baseline duration: {video_path}...")
|
|
cmd = [
|
|
'ffprobe', '-v', 'error', '-show_entries', 'format=duration',
|
|
'-of', 'default=noprint_wrappers=1:nokey=1', video_path
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
try:
|
|
duration = float(result.stdout.strip())
|
|
print(f"Baseline duration found: {duration} seconds.")
|
|
return duration
|
|
except ValueError:
|
|
print("Error: Could not determine the duration of the baseline video.")
|
|
return None
|
|
|
|
def get_next_keyframe(input_video, cut_timestamp):
|
|
"""Finds the next keyframe immediately following the cut timestamp."""
|
|
cmd = [
|
|
'ffprobe', '-v', 'quiet', '-select_streams', 'v',
|
|
'-skip_frame', 'nokey', '-show_frames',
|
|
'-show_entries', 'frame=pkt_pts_time,pkt_dts_time,best_effort_timestamp_time',
|
|
'-of', 'json', '-read_intervals', f'{cut_timestamp}%+180', input_video
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
frames = json.loads(result.stdout).get('frames', [])
|
|
|
|
for frame in frames:
|
|
# Safely try multiple timestamp keys
|
|
time_str = (frame.get('best_effort_timestamp_time') or
|
|
frame.get('pkt_pts_time') or
|
|
frame.get('pkt_dts_time'))
|
|
|
|
if time_str is not None:
|
|
keyframe_time = float(time_str)
|
|
if keyframe_time > cut_timestamp:
|
|
return keyframe_time
|
|
return None
|
|
|
|
def smart_cut(input_video, output_video, cut_timestamp):
|
|
"""Executes the Smart Cut process: Re-encode a tiny segment, copy the rest, and stitch."""
|
|
print(f"Processing: {input_video}...")
|
|
|
|
# 0. Pre-flight: check the file actually has a video stream
|
|
streams = probe_streams(input_video)
|
|
video_streams = [s for s in streams if s.get('codec_type') == 'video']
|
|
audio_streams = [s for s in streams if s.get('codec_type') == 'audio']
|
|
if not video_streams:
|
|
print(f"Skipping {input_video}: No video stream found. Streams detected: "
|
|
f"{[s.get('codec_type') + '/' + s.get('codec_name', '?') for s in streams]}")
|
|
return
|
|
print(f" Streams: video={[s.get('codec_name') for s in video_streams]}, "
|
|
f"audio={[s.get('codec_name') for s in audio_streams]}")
|
|
|
|
next_keyframe = get_next_keyframe(input_video, cut_timestamp)
|
|
|
|
if not next_keyframe:
|
|
print(f"Skipping {input_video}: Could not find a keyframe after {cut_timestamp}s.")
|
|
return
|
|
|
|
# Use sanitised temp filenames to avoid issues with special chars in the original name
|
|
part1 = "temp_part1.mp4"
|
|
part2 = "temp_part2.mp4"
|
|
concat_list = "concat_list.txt"
|
|
|
|
try:
|
|
# 1. Re-encode the tiny segment (from exact cut to next keyframe)
|
|
# Audio is removed (-an) to prevent sync/overlap issues during concatenation
|
|
res1 = subprocess.run([
|
|
'ffmpeg', '-y', '-v', 'error', '-i', input_video,
|
|
'-ss', str(cut_timestamp), '-to', str(next_keyframe),
|
|
'-c:v', 'libx264', '-crf', '18', '-an', part1
|
|
], capture_output=True, text=True)
|
|
if res1.returncode != 0:
|
|
print(f" Step 1 failed (re-encode segment): {res1.stderr.strip()}")
|
|
return
|
|
|
|
# 2. Copy the rest of the video (from next keyframe to the end)
|
|
# Audio is removed (-an) here too
|
|
res2 = subprocess.run([
|
|
'ffmpeg', '-y', '-v', 'error', '-ss', str(next_keyframe), '-i', input_video,
|
|
'-c:v', 'copy', '-an', part2
|
|
], capture_output=True, text=True)
|
|
if res2.returncode != 0:
|
|
print(f" Step 2 failed (copy remainder): {res2.stderr.strip()}")
|
|
return
|
|
|
|
# Validate that both parts have content
|
|
for label, fp in [("Part 1", part1), ("Part 2", part2)]:
|
|
if not os.path.exists(fp) or os.path.getsize(fp) == 0:
|
|
print(f" Skipping {input_video}: {label} is empty — source encoding may be incompatible.")
|
|
return
|
|
|
|
# 3. Concatenate video parts and cleanly mux with the original extracted audio
|
|
# Use absolute paths with forward slashes — Windows FFmpeg's concat demuxer
|
|
# cannot resolve relative paths when the CWD contains Unicode characters.
|
|
abs_part1 = os.path.abspath(part1).replace('\\', '/')
|
|
abs_part2 = os.path.abspath(part2).replace('\\', '/')
|
|
abs_concat = os.path.abspath(concat_list).replace('\\', '/')
|
|
abs_input = os.path.abspath(input_video).replace('\\', '/')
|
|
abs_output = os.path.abspath(output_video).replace('\\', '/')
|
|
|
|
with open(concat_list, 'w', encoding='utf-8') as f:
|
|
f.write(f"file '{abs_part1}'\nfile '{abs_part2}'\n")
|
|
|
|
subprocess.run([
|
|
'ffmpeg', '-y', '-v', 'error',
|
|
'-f', 'concat', '-safe', '0', '-i', abs_concat,
|
|
'-ss', str(cut_timestamp), '-i', abs_input,
|
|
'-map', '0:v', '-map', '1:a?',
|
|
'-c:v', 'copy', '-c:a', 'copy', abs_output
|
|
], check=True)
|
|
print(f"Success! Saved to {output_video}")
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error processing {input_video}. FFmpeg failed: {e}")
|
|
finally:
|
|
# 4. Clean up temporary files safely
|
|
for temp_file in [part1, part2, concat_list]:
|
|
if os.path.exists(temp_file):
|
|
os.remove(temp_file)
|
|
|
|
def main():
|
|
# 1. Check if the user provided an argument
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python cut.py <path_to_file_or_folder>")
|
|
return
|
|
|
|
# 2. Get the absolute path of the provided argument
|
|
# 2. Get the absolute path and strip any accidental quotes
|
|
raw_input = sys.argv[1].strip(' "''')
|
|
input_arg = os.path.abspath(raw_input)
|
|
# input_arg = os.path.abspath(sys.argv[1])
|
|
print(input_arg)
|
|
|
|
# 3. Determine if it's a file or a folder
|
|
if os.path.isfile(input_arg):
|
|
if not input_arg.lower().endswith('.mp4'):
|
|
print("Error: The specified file is not an .mp4 video.")
|
|
return
|
|
target_dir = os.path.dirname(input_arg)
|
|
target_files = [os.path.basename(input_arg)] # Process only this single file
|
|
|
|
elif os.path.isdir(input_arg):
|
|
target_dir = input_arg
|
|
target_files = None # We will find all files later
|
|
|
|
else:
|
|
print(f"Error: Path '{input_arg}' does not exist.")
|
|
return
|
|
|
|
# 4. Change the working directory to the target folder
|
|
# This prevents FFmpeg from crashing due to complex Windows path names during concatenation
|
|
os.chdir(target_dir)
|
|
|
|
# 5. If it was a folder, grab all .mp4 files
|
|
if target_files is None:
|
|
target_files = glob.glob("*.mp4")
|
|
|
|
# --- Head file detection ---
|
|
# Priority 1: exact "i.mp4"
|
|
# Priority 2: files matching "i<number>.mp4" (e.g. i10.mp4, i9.mp4)
|
|
baseline_file = None
|
|
head_pattern = re.compile(r'^i(\d+)\.mp4$', re.IGNORECASE)
|
|
head_files = [] # all files that match head patterns (to exclude from processing)
|
|
|
|
if os.path.exists("i.mp4"):
|
|
baseline_file = "i.mp4"
|
|
head_files.append("i.mp4")
|
|
print("Using head file: i.mp4")
|
|
else:
|
|
# Find all i<number>.mp4 candidates
|
|
candidates = []
|
|
for f in glob.glob("i*.mp4"):
|
|
m = head_pattern.match(f)
|
|
if m:
|
|
candidates.append((int(m.group(1)), f))
|
|
head_files.append(f)
|
|
if candidates:
|
|
# Sort by the numeric suffix and pick the first (smallest number)
|
|
candidates.sort(key=lambda x: x[0])
|
|
baseline_file = candidates[0][1]
|
|
print(f"Using head file (fallback): {baseline_file}")
|
|
|
|
if not baseline_file:
|
|
print(f"Error: No head file found (i.mp4 or i<number>.mp4) in: {target_dir}")
|
|
return
|
|
|
|
# Remove all head-pattern files from the processing list
|
|
target_files = [f for f in target_files if f not in head_files]
|
|
|
|
if not target_files:
|
|
print("No other .mp4 files found to process.")
|
|
return
|
|
|
|
# Get exact cut timestamp from the baseline
|
|
cut_timestamp = get_video_duration(baseline_file)
|
|
if not cut_timestamp:
|
|
return
|
|
|
|
# Create output directory
|
|
output_dir = "processed_videos"
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
print(f"Found {len(target_files)} video(s) to process in '{target_dir}'.\n" + "-"*30)
|
|
|
|
# Process each video
|
|
for video in target_files:
|
|
output_path = os.path.join(output_dir, video)
|
|
smart_cut(video, output_path, cut_timestamp)
|
|
|
|
print("-" * 30 + "\nBatch processing complete!")
|
|
|
|
if __name__ == "__main__":
|
|
main() |