nextav/docs/05-nextjs-adaptation-guide.md

21 KiB

Next.js Adaptation Guide - Complete Implementation

Overview

This guide provides comprehensive strategies for implementing Stash's media streaming architecture in a Next.js application, covering process management, progress tracking, and anti-jitter mechanisms.

Key Architecture Differences

Stash (Go) Next.js (Node.js)
context.WithCancel AbortController
http.Hijacker Request events (close, aborted)
exec.Command child_process.spawn
sync.WaitGroup Promise.all + process tracking
File locks Process pool + cleanup
Graceful shutdown SIGTERM handlers

1. Core Process Management

AbortController Pattern

// lib/transcode/abort-controller.ts
import { spawn, ChildProcess } from 'child_process';

export class TranscodeController {
  private controller: AbortController;
  private process?: ChildProcess;
  private startTime: Date;
  private filePath: string;

  constructor(filePath: string) {
    this.controller = new AbortController();
    this.startTime = new Date();
    this.filePath = filePath;
  }

  async startTranscode(args: string[]): Promise<ChildProcess> {
    const { signal } = this.controller;
    
    this.process = spawn('ffmpeg', args, {
      signal,
      stdio: ['ignore', 'pipe', 'pipe'],
      detached: false
    });

    // Handle cleanup on abort
    signal.addEventListener('abort', () => {
      this.cleanup();
    });

    return this.process;
  }

  private cleanup() {
    if (this.process && !this.process.killed) {
      this.process.kill('SIGKILL');
      this.process.unref();
    }
  }

  abort() {
    this.controller.abort();
  }

  getUptime(): number {
    return Date.now() - this.startTime.getTime();
  }

  isRunning(): boolean {
    return this.process ? !this.process.killed : false;
  }
}

Process Pool Management

// lib/transcode/process-pool.ts
import { ChildProcess } from 'child_process';
import { EventEmitter } from 'events';

interface ProcessEntry {
  process: ChildProcess;
  controller: TranscodeController;
  startTime: Date;
  filePath: string;
  quality: string;
}

export class TranscodeProcessPool extends EventEmitter {
  private processes = new Map<string, ProcessEntry>();
  private maxProcesses = parseInt(process.env.MAX_TRANSCODE_PROCESSES || '4');

  add(
    key: string,
    process: ChildProcess,
    controller: TranscodeController,
    filePath: string,
    quality: string
  ): boolean {
    if (this.processes.size >= this.maxProcesses) {
      return false;
    }

    this.processes.set(key, {
      process,
      controller,
      startTime: new Date(),
      filePath,
      quality
    });

    // Auto cleanup
    process.on('exit', () => {
      this.processes.delete(key);
      this.emit('process-exit', key);
    });

    this.emit('process-added', key);
    return true;
  }

  kill(key: string): boolean {
    const entry = this.processes.get(key);
    if (entry) {
      entry.controller.abort();
      this.processes.delete(key);
      return true;
    }
    return false;
  }

  killAll(): void {
    for (const [key, entry] of this.processes) {
      entry.controller.abort();
    }
    this.processes.clear();
  }

  getActiveProcesses(): ProcessEntry[] {
    return Array.from(this.processes.values());
  }

  getStats() {
    return {
      active: this.processes.size,
      max: this.maxProcesses,
      uptime: Array.from(this.processes.values()).map(p => ({
        filePath: p.filePath,
        quality: p.quality,
        uptime: Date.now() - p.startTime.getTime()
      }))
    };
  }
}

export const transcodePool = new TranscodeProcessPool();

2. API Route Implementation

Transcode API Route

// pages/api/transcode/[...slug].ts
import { NextApiRequest, NextApiResponse } from 'next';
import { spawn } from 'child_process';
import { TranscodeController } from '../../../lib/transcode/abort-controller';
import { transcodePool } from '../../../lib/transcode/process-pool';

export const config = {
  runtime: 'nodejs',
  api: {
    responseLimit: '500mb',
  },
};

export default async function handler(
  req: NextApiRequest,
  res: NextApiResponse
) {
  if (req.method !== 'GET') {
    return res.status(405).json({ error: 'Method not allowed' });
  }

  const { file, quality = '720p', start = '0' } = req.query;
  
  if (!file || typeof file !== 'string') {
    return res.status(400).json({ error: 'File parameter required' });
  }

  const startTime = parseFloat(start as string);
  const key = `${file}:${quality}:${startTime}`;

  try {
    // Create transcode controller
    const controller = new TranscodeController(file);
    
    // Build ffmpeg arguments
    const args = [
      '-i', file,
      '-ss', startTime.toString(),
      '-c:v', 'libx264',
      '-preset', 'fast',
      '-crf', '23',
      '-c:a', 'aac',
      '-b:a', '128k',
      '-f', 'mp4',
      '-movflags', 'frag_keyframe+empty_moov',
      '-max_muxing_queue_size', '9999',
      'pipe:1'
    ];

    // Handle client disconnect
    req.on('close', () => {
      controller.abort();
    });

    req.on('aborted', () => {
      controller.abort();
    });

    // Start transcoding
    const process = await controller.startTranscode(args);
    
    // Add to process pool
    if (!transcodePool.add(key, process, controller, file, quality as string)) {
      controller.abort();
      return res.status(503).json({ error: 'Too many concurrent transcodes' });
    }

    // Set response headers
    res.setHeader('Content-Type', 'video/mp4');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Access-Control-Allow-Origin', '*');

    // Handle range requests
    const range = req.headers.range;
    if (range) {
      const parts = range.replace(/bytes=/, '').split('-');
      const start = parseInt(parts[0], 10);
      const end = parts[1] ? parseInt(parts[1], 10) : undefined;
      
      res.status(206);
      res.setHeader('Content-Range', `bytes ${start}-${end || '*'}/${'*'}`);
    }

    // Stream output
    process.stdout.pipe(res);

    // Handle errors
    process.stderr.on('data', (data) => {
      console.error(`FFmpeg error: ${data}`);
    });

    process.on('error', (error) => {
      console.error('Transcoding process error:', error);
      if (!res.headersSent) {
        res.status(500).json({ error: 'Transcoding failed' });
      }
    });

  } catch (error) {
    console.error('Transcode error:', error);
    if (!res.headersSent) {
      res.status(500).json({ error: 'Internal server error' });
    }
  }
}

HLS Manifest API

// pages/api/hls/[id].ts
import { NextApiRequest, NextApiResponse } from 'next';
import { extractVideoMetadata } from '../../../lib/video/metadata';

export default async function handler(
  req: NextApiRequest,
  res: NextApiResponse
) {
  const { id, quality = '720p' } = req.query;
  
  try {
    const metadata = await extractVideoMetadata(id as string);
    const segmentDuration = 2; // 2-second segments
    const totalSegments = Math.ceil(metadata.duration / segmentDuration);
    
    const manifest = [
      '#EXTM3U',
      '#EXT-X-VERSION:3',
      '#EXT-X-TARGETDURATION:2',
      '#EXT-X-MEDIA-SEQUENCE:0',
      ...Array.from({ length: totalSegments }, (_, i) => [
        `#EXTINF:${Math.min(segmentDuration, metadata.duration - i * segmentDuration).toFixed(3)},`,
        `/api/hls/${id}/segment/${i}?quality=${quality}`
      ]).flat(),
      '#EXT-X-ENDLIST'
    ].join('\n');

    res.setHeader('Content-Type', 'application/vnd.apple.mpegurl');
    res.setHeader('Cache-Control', 'no-cache');
    res.send(manifest);
    
  } catch (error) {
    res.status(500).json({ error: 'Failed to generate manifest' });
  }
}

3. Progress Tracking with Anti-Jitter

React Hook Implementation

// hooks/useTranscodeProgress.ts
import { useState, useEffect, useRef, useCallback } from 'react';

interface ProgressState {
  progress: number;
  buffered: number;
  isTranscoding: boolean;
  currentTime: number;
  duration: number;
}

interface TranscodeProgressOptions {
  debounceMs?: number;
  enableSmoothing?: boolean;
  maxJitter?: number;
}

export const useTranscodeProgress = (
  videoUrl: string,
  totalDuration: number,
  options: TranscodeProgressOptions = {}
) => {
  const {
    debounceMs = 100,
    enableSmoothing = true,
    maxJitter = 0.01
  } = options;

  const [state, setState] = useState<ProgressState>({
    progress: 0,
    buffered: 0,
    isTranscoding: true,
    currentTime: 0,
    duration: totalDuration
  });

  const videoRef = useRef<HTMLVideoElement>(null);
  const lastProgressRef = useRef(0);
  const lastUpdateRef = useRef(Date.now());
  const abortControllerRef = useRef<AbortController | null>(null);

  const debounce = useCallback((func: Function, wait: number) => {
    let timeout: NodeJS.Timeout;
    return (...args: any[]) => {
      clearTimeout(timeout);
      timeout = setTimeout(() => func(...args), wait);
    };
  }, []);

  const updateProgress = useCallback(debounce((newProgress: number) => {
    const now = Date.now();
    const timeSinceLastUpdate = now - lastUpdateRef.current;

    // Anti-jitter: only update if significant change or time elapsed
    const progressDiff = Math.abs(newProgress - lastProgressRef.current);
    const shouldUpdate = progressDiff > maxJitter || timeSinceLastUpdate > 500;

    if (shouldUpdate) {
      lastProgressRef.current = newProgress;
      lastUpdateRef.current = now;
      
      setState(prev => ({
        ...prev,
        progress: Math.min(newProgress, 1),
        currentTime: newProgress * totalDuration
      }));
    }
  }, debounceMs), [debounceMs, maxJitter, totalDuration]);

  const updateBuffer = useCallback(debounce(() => {
    const video = videoRef.current;
    if (!video) return;

    const buffered = video.buffered;
    if (buffered.length > 0) {
      const bufferedEnd = buffered.end(buffered.length - 1);
      const bufferedRatio = bufferedEnd / totalDuration;
      
      setState(prev => ({
        ...prev,
        buffered: Math.min(bufferedRatio, 1)
      }));
    }
  }, 250), [totalDuration]);

  const checkTranscodingStatus = useCallback(async () => {
    if (abortControllerRef.current) {
      abortControllerRef.current.abort();
    }

    abortControllerRef.current = new AbortController();

    try {
      const response = await fetch(`/api/transcode/status?url=${encodeURIComponent(videoUrl)}`, {
        signal: abortControllerRef.current.signal
      });
      
      if (response.ok) {
        const data = await response.json();
        setState(prev => ({
          ...prev,
          isTranscoding: data.transcoding,
          duration: data.duration || prev.duration
        }));
      }
    } catch (error) {
      if (error.name !== 'AbortError') {
        console.error('Failed to check transcoding status:', error);
      }
    }
  }, [videoUrl]);

  useEffect(() => {
    const video = videoRef.current;
    if (!video) return;

    const handleTimeUpdate = () => {
      updateProgress(video.currentTime / totalDuration);
    };

    const handleProgress = () => {
      updateBuffer();
    };

    const handleLoadedMetadata = () => {
      setState(prev => ({
        ...prev,
        duration: video.duration || totalDuration
      }));
    };

    video.addEventListener('timeupdate', handleTimeUpdate);
    video.addEventListener('progress', handleProgress);
    video.addEventListener('loadedmetadata', handleLoadedMetadata);

    const interval = setInterval(checkTranscodingStatus, 2000);

    return () => {
      video.removeEventListener('timeupdate', handleTimeUpdate);
      video.removeEventListener('progress', handleProgress);
      video.removeEventListener('loadedmetadata', handleLoadedMetadata);
      clearInterval(interval);
      
      if (abortControllerRef.current) {
        abortControllerRef.current.abort();
      }
    };
  }, [videoRef, totalDuration, updateProgress, updateBuffer, checkTranscodingStatus]);

  return {
    ...state,
    videoRef,
    retry: checkTranscodingStatus
  };
};

4. Video Player Component

Complete Player Implementation

// components/VideoPlayer.tsx
import React, { useRef, useEffect } from 'react';
import { useTranscodeProgress } from '../hooks/useTranscodeProgress';

interface VideoPlayerProps {
  src: string;
  poster?: string;
  autoPlay?: boolean;
  className?: string;
}

export const VideoPlayer: React.FC<VideoPlayerProps> = ({
  src,
  poster,
  autoPlay = false,
  className
}) => {
  const {
    videoRef,
    progress,
    buffered,
    isTranscoding,
    currentTime,
    duration,
    retry
  } = useTranscodeProgress(src, 0);

  const handleSeek = (time: number) => {
    if (videoRef.current) {
      videoRef.current.currentTime = time;
    }
  };

  const formatTime = (seconds: number) => {
    const mins = Math.floor(seconds / 60);
    const secs = Math.floor(seconds % 60);
    return `${mins}:${secs.toString().padStart(2, '0')}`;
  };

  return (
    <div className={`video-player ${className || ''}`}>
      <video
        ref={videoRef}
        src={src}
        poster={poster}
        autoPlay={autoPlay}
        controls
        className="video-element"
      />
      
      {isTranscoding && (
        <div className="transcoding-overlay">
          <div className="transcoding-indicator">
            <div className="spinner" />
            <span>Transcoding...</span>
          </div>
        </div>
      )}
      
      <div className="progress-bar">
        <div 
          className="progress-fill" 
          style={{ width: `${progress * 100}%` }}
        />
        <div 
          className="buffer-fill" 
          style={{ width: `${buffered * 100}%` }}
        />
      </div>
      
      <div className="time-display">
        <span>{formatTime(currentTime)} / {formatTime(duration)}</span>
      </div>
    </div>
  );
};

5. Health Monitoring

Health Check Endpoint

// pages/api/health.ts
import { NextApiRequest, NextApiResponse } from 'next';
import { transcodePool } from '../../lib/transcode/process-pool';
import { healthMonitor } from '../../lib/transcode/health-monitor';

export default function handler(req: NextApiRequest, res: NextApiResponse) {
  const stats = transcodePool.getStats();
  const memoryUsage = process.memoryUsage();
  
  res.json({
    status: 'healthy',
    timestamp: new Date().toISOString(),
    processes: stats,
    memory: {
      used: memoryUsage.heapUsed,
      total: memoryUsage.heapTotal,
      external: memoryUsage.external
    },
    uptime: process.uptime()
  });
}

Health Monitor Service

// lib/transcode/health-monitor.ts
import { transcodePool } from './process-pool';

export class HealthMonitor {
  private interval?: NodeJS.Timeout;
  private maxMemoryUsage = 0.9; // 90%
  private maxProcessUptime = 300000; // 5 minutes

  start() {
    this.interval = setInterval(() => {
      this.checkMemoryUsage();
      this.checkProcessTimeouts();
      this.logStats();
    }, 30000);
  }

  stop() {
    if (this.interval) {
      clearInterval(this.interval);
    }
  }

  private checkMemoryUsage() {
    const memUsage = process.memoryUsage();
    const ratio = memUsage.heapUsed / memUsage.heapTotal;
    
    if (ratio > this.maxMemoryUsage) {
      console.warn('High memory usage detected, cleaning up old processes');
      this.cleanupOldProcesses();
    }
  }

  private checkProcessTimeouts() {
    const processes = transcodePool.getActiveProcesses();
    
    processes.forEach(p => {
      if (p.uptime > this.maxProcessUptime) {
        console.warn(`Killing hanging process: ${p.filePath}`);
        transcodePool.kill(`${p.filePath}:${p.quality}`);
      }
    });
  }

  private logStats() {
    const stats = transcodePool.getStats();
    console.log('Transcode stats:', {
      active: stats.active,
      uptime: stats.uptime
    });
  }

  private cleanupOldProcesses() {
    const processes = transcodePool.getActiveProcesses();
    const sorted = processes.sort((a, b) => a.uptime - b.uptime);
    
    // Keep newest half, kill oldest half
    const toKeep = Math.floor(sorted.length / 2);
    sorted.slice(0, toKeep).forEach(p => {
      transcodePool.kill(`${p.filePath}:${p.quality}`);
    });
  }
}

export const healthMonitor = new HealthMonitor();

6. Production Deployment

PM2 Configuration

// ecosystem.config.js
module.exports = {
  apps: [{
    name: 'nextjs-transcode',
    script: 'node_modules/.bin/next',
    args: 'start',
    instances: 'max',
    exec_mode: 'cluster',
    max_memory_restart: '1G',
    env: {
      NODE_ENV: 'production',
      PORT: 3000,
      MAX_TRANSCODE_PROCESSES: '4',
      TRANSCODE_TIMEOUT: '300000'
    },
    kill_timeout: 10000,
    shutdown_with_message: true
  }]
};

Docker Configuration

FROM node:18-alpine

# Install FFmpeg
RUN apk add --no-cache ffmpeg

WORKDIR /app

# Copy package files
COPY package*.json ./
RUN npm ci --only=production

# Copy application code
COPY . .

# Build the application
RUN npm run build

# Set resource limits
ENV NODE_OPTIONS="--max-old-space-size=1024"
ENV MAX_TRANSCODE_PROCESSES="4"

EXPOSE 3000

CMD ["npm", "start"]

Environment Variables

# .env.production
NODE_ENV=production
MAX_TRANSCODE_PROCESSES=4
TRANSCODE_TIMEOUT=300000
FFMPEG_PATH=/usr/bin/ffmpeg
MAX_MEMORY_USAGE=0.9
HEALTH_CHECK_INTERVAL=30000

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nextjs-transcode
spec:
  replicas: 3
  selector:
    matchLabels:
      app: nextjs-transcode
  template:
    metadata:
      labels:
        app: nextjs-transcode
    spec:
      containers:
      - name: nextjs
        image: nextjs-transcode:latest
        ports:
        - containerPort: 3000
        env:
        - name: NODE_ENV
          value: "production"
        - name: MAX_TRANSCODE_PROCESSES
          value: "2"
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /api/health
            port: 3000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /api/health
            port: 3000
          initialDelaySeconds: 5
          periodSeconds: 5

7. Testing Strategy

Unit Tests

// __tests__/transcode-controller.test.ts
import { TranscodeController } from '../lib/transcode/abort-controller';

describe('TranscodeController', () => {
  it('should start and abort transcoding', async () => {
    const controller = new TranscodeController('test.mp4');
    const process = await controller.startTranscode(['-i', 'test.mp4', '-f', 'null', '-']);
    
    expect(controller.isRunning()).toBe(true);
    
    controller.abort();
    
    // Wait for process to exit
    await new Promise(resolve => process.on('exit', resolve));
    
    expect(controller.isRunning()).toBe(false);
  });

  it('should handle process timeout', async () => {
    const controller = new TranscodeController('test.mp4');
    
    // Test with invalid file to force quick exit
    const process = await controller.startTranscode(['-i', 'nonexistent.mp4', '-f', 'null', '-']);
    
    const exitCode = await new Promise(resolve => process.on('exit', resolve));
    expect(exitCode).toBe(1);
  });
});

Integration Tests

// __tests__/api-transcode.test.ts
import { createServer } from 'http';
import request from 'supertest';
import { parse } from 'url';
import next from 'next';

describe('/api/transcode', () => {
  let server: any;

  beforeAll(async () => {
    const app = next({ dev: false });
    await app.prepare();
    
    server = createServer((req, res) => {
      const parsedUrl = parse(req.url, true);
      app.getRequestHandler()(req, res, parsedUrl);
    });
  });

  afterAll(() => {
    server.close();
  });

  it('should handle transcode requests', async () => {
    const response = await request(server)
      .get('/api/transcode/test.mp4')
      .expect(200);
    
    expect(response.headers['content-type']).toBe('video/mp4');
  });

  it('should handle client disconnect', async () => {
    const response = await request(server)
      .get('/api/transcode/test.mp4')
      .timeout(1000); // Force timeout
    
    expect(response.status).toBe(408);
  });
});

8. Performance Optimization

Caching Strategy

// lib/cache/transcode-cache.ts
import NodeCache from 'node-cache';

export class TranscodeCache {
  private cache = new NodeCache({
    stdTTL: 3600, // 1 hour
    checkperiod: 600, // 10 minutes
    maxKeys: 1000
  });

  get(key: string): Buffer | undefined {
    return this.cache.get(key);
  }

  set(key: string, data: Buffer, ttl?: number): boolean {
    return this.cache.set(key, data, ttl);
  }

  has(key: string): boolean {
    return this.cache.has(key);
  }

  delete(key: string): number {
    return this.cache.del(key);
  }

  getStats() {
    return this.cache.getStats();
  }
}

export const transcodeCache = new TranscodeCache();

Rate Limiting

// lib/middleware/rate-limit.ts
import rateLimit from 'express-rate-limit';

export const transcodeRateLimit = rateLimit({
  windowMs: 15 * 60 * 1000, // 15 minutes
  max: 100, // Limit each IP to 100 requests per windowMs
  message: 'Too many transcode requests',
  standardHeaders: true,
  legacyHeaders: false,
});

This comprehensive Next.js adaptation provides all the components needed to implement Stash's reliable transcoding system with modern Node.js patterns and Next.js optimizations.