feat: add stream validation, robust error handling, and sanitized temporary filenames to smart_cut process

This commit is contained in:
tigerenwork 2026-04-13 00:34:58 +08:00
parent 25ccd53378
commit 8e7c9d374a
1 changed files with 47 additions and 8 deletions

View File

@ -4,6 +4,20 @@ import os
import glob
import sys
def probe_streams(video_path):
"""Probes the input file and returns info about its streams."""
cmd = [
'ffprobe', '-v', 'error', '-show_entries',
'stream=index,codec_type,codec_name',
'-of', 'json', video_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
try:
data = json.loads(result.stdout)
return data.get('streams', [])
except (json.JSONDecodeError, ValueError):
return []
def get_video_duration(video_path):
"""Uses ffprobe to extract the exact duration of the baseline video."""
print(f"Analyzing baseline duration: {video_path}...")
@ -46,31 +60,56 @@ def get_next_keyframe(input_video, cut_timestamp):
def smart_cut(input_video, output_video, cut_timestamp):
"""Executes the Smart Cut process: Re-encode a tiny segment, copy the rest, and stitch."""
print(f"Processing: {input_video}...")
# 0. Pre-flight: check the file actually has a video stream
streams = probe_streams(input_video)
video_streams = [s for s in streams if s.get('codec_type') == 'video']
audio_streams = [s for s in streams if s.get('codec_type') == 'audio']
if not video_streams:
print(f"Skipping {input_video}: No video stream found. Streams detected: "
f"{[s.get('codec_type') + '/' + s.get('codec_name', '?') for s in streams]}")
return
print(f" Streams: video={[s.get('codec_name') for s in video_streams]}, "
f"audio={[s.get('codec_name') for s in audio_streams]}")
next_keyframe = get_next_keyframe(input_video, cut_timestamp)
if not next_keyframe:
print(f"Skipping {input_video}: Could not find a keyframe after {cut_timestamp}s.")
return
part1 = f"temp_part1_{input_video}"
part2 = f"temp_part2_{input_video}"
concat_list = f"concat_{input_video}.txt"
# Use sanitised temp filenames to avoid issues with special chars in the original name
part1 = "temp_part1.mp4"
part2 = "temp_part2.mp4"
concat_list = "concat_list.txt"
try:
# 1. Re-encode the tiny segment (from exact cut to next keyframe)
# Audio is removed (-an) to prevent sync/overlap issues during concatenation
subprocess.run([
res1 = subprocess.run([
'ffmpeg', '-y', '-v', 'error', '-i', input_video,
'-ss', str(cut_timestamp), '-to', str(next_keyframe),
'-c:v', 'libx264', '-crf', '18', '-an', part1
], check=True)
], capture_output=True, text=True)
if res1.returncode != 0:
print(f" Step 1 failed (re-encode segment): {res1.stderr.strip()}")
return
# 2. Copy the rest of the video (from next keyframe to the end)
# Audio is removed (-an) here too
subprocess.run([
res2 = subprocess.run([
'ffmpeg', '-y', '-v', 'error', '-ss', str(next_keyframe), '-i', input_video,
'-c:v', 'copy', '-an', part2
], check=True)
], capture_output=True, text=True)
if res2.returncode != 0:
print(f" Step 2 failed (copy remainder): {res2.stderr.strip()}")
return
# Validate that both parts have content
for label, fp in [("Part 1", part1), ("Part 2", part2)]:
if not os.path.exists(fp) or os.path.getsize(fp) == 0:
print(f" Skipping {input_video}: {label} is empty — source encoding may be incompatible.")
return
# 3. Concatenate video parts and cleanly mux with the original extracted audio
with open(concat_list, 'w', encoding='utf-8') as f:
@ -86,7 +125,7 @@ def smart_cut(input_video, output_video, cut_timestamp):
print(f"Success! Saved to {output_video}")
except subprocess.CalledProcessError as e:
print(f"Error processing {input_video}. FFmpeg failed.")
print(f"Error processing {input_video}. FFmpeg failed: {e}")
finally:
# 4. Clean up temporary files safely
for temp_file in [part1, part2, concat_list]: