#!/usr/bin/env python3 import os import sys import json import subprocess import numpy as np import soundfile as sf import torchaudio import torch import whisper from deep_translator import GoogleTranslator from pyannote.audio import Pipeline from rich.console import Console from rich.progress import Progress, BarColumn, TextColumn, TimeRemainingColumn, TimeElapsedColumn console = Console() # --- Safe-global engedélyezés TorchVersion számára (checkpoint hack) --- from torch.serialization import add_safe_globals from torch.torch_version import TorchVersion from pyannote.audio.core.task import Specifications, Problem, Resolution import dataclasses # allowloading checkpoints that store these classes (PyTorch 2.6+ weights_only change) add_safe_globals([TorchVersion, Specifications, Problem, Resolution]) # --- FFmpeg és Whisper a saját rendszered pipeline-jához, HF Token kötelező --- if len(sys.argv) < 2: console.print("❌ Használat: python3 dub_pipeline.py ") sys.exit(1) video_path = sys.argv[1] if not os.path.exists(video_path): console.print("❌ A videó nem található!") sys.exit(1) token = os.environ.get("HF_TOKEN") if not token: console.print("❌ HF_TOKEN NINCS beállítva!") sys.exit(1) AUDIO_WAV = "audio.wav" # --- 1. Hang kinyerés FFmpeg-gel mono 16kHz WAV-ba --- console.print("\n🎬 [1/6] Hang kinyerés videóból (mono, 16kHz WAV)…") if os.path.exists(AUDIO_WAV): os.remove(AUDIO_WAV) with Progress( TextColumn("{task.description}"), BarColumn(), TimeElapsedColumn(), TimeRemainingColumn(), ) as progress: progress.add_task("FFmpeg extract fut…", total=None) subprocess.run([ "ffmpeg","-y","-i",video_path, "-map","0:a:0","-ac","1","-ar","16000","-c:a","pcm_s16le",AUDIO_WAV ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) if not os.path.exists(AUDIO_WAV): console.print("❌ Hang extract sikertelen!") sys.exit(1) console.print("✔ Hang kinyerve: audio.wav\n") # --- 2. Whisper STT (angol beszéd felismerés) --- console.print("🎤 [2/6] Speech-to-Text (Whisper, EN felismerés)…") model = whisper.load_model("medium") # stabil, többnyelvű with Progress(TextColumn("Whisper STT…"), BarColumn(), TimeElapsedColumn(), TimeRemainingColumn()) as p: p.add_task("STT fut…", total=None) result = model.transcribe(AUDIO_WAV, language="en") segments = result.get("segments", []) console.print(f"✔ STT kész, szegmensek száma: {len(segments)}") # --- 2.5 Fordítás angol → magyarra --- console.print("\n🌍 Angol → Magyar fordítás…") translator = GoogleTranslator(source="en", target="hu") translated = [] for seg in segments: text_en = seg["text"].strip() text_hu = translator.translate(text_en) if text_en else "" translated.append({ "speaker": "UNKNOWN", # később diarization fogja kitölteni "start": seg["start"], "end": seg["end"], "en": text_en, "hu": text_hu }) with open("segments_translated.json","w",encoding="utf-8") as f: json.dump(translated,f,ensure_ascii=False,indent=2) console.print("✔ Fordított szegmensek mentve: segments_translated.json\n") # --- 3. Beszélők felismerése (Pyannote diarization) --- console.print("👥 [3/6] Beszélők diarization (pyannote/speaker-diarization-3.1)…") waveform, sample_rate = torchaudio.load(AUDIO_WAV) audio_dict = {"waveform": waveform, "sample_rate": sample_rate} diar_pipeline = Pipeline.from_pretrained( "pyannote/speaker-diarization-3.1", token=token ) with Progress(TextColumn("Diarization fut…"), BarColumn(), TimeElapsedColumn(), TimeRemainingColumn()) as p: p.add_task("Beszélők szétválasztása…", total=None) diarization = diar_pipeline(audio_dict) # speaker szegmensek bejárása speaker_segments = [] speakers = set() def _yield_from_annotation(annotation_obj): if annotation_obj is None: return if hasattr(annotation_obj, "itertracks"): for segment, _, speaker in annotation_obj.itertracks(yield_label=True): yield speaker, float(segment.start), float(segment.end) def _yield_from_track_dicts(track_list): if not track_list: return for item in track_list: if not isinstance(item, dict): continue speaker = item.get("speaker") or item.get("label") or "UNKNOWN" start = float(item.get("start", 0.0)) end = float(item.get("end", start)) yield speaker, start, end def extract_segments(diar_result): # pyannote 3.x returns Annotation with itertracks; 4.x returns DiarizeOutput/dataclass if hasattr(diar_result, "itertracks"): for segment, _, speaker in diar_result.itertracks(yield_label=True): yield speaker, float(segment.start), float(segment.end) return # dataclass (e.g., DiarizeOutput) -> inspect fields if dataclasses.is_dataclass(diar_result): data_dict = dataclasses.asdict(diar_result) yield from extract_segments(data_dict) return # dict-like if isinstance(diar_result, dict): annotation = diar_result.get("annotation") if annotation is not None: yield from _yield_from_annotation(annotation) return tracks = diar_result.get("tracks") yielded = False for seg in _yield_from_track_dicts(tracks): yielded = True yield seg if yielded: return # try any nested value that is annotation-like for val in diar_result.values(): if hasattr(val, "itertracks"): yield from _yield_from_annotation(val) return # as last resort, try list/tuple of dicts if isinstance(diar_result, (list, tuple)): for seg in _yield_from_track_dicts(diar_result): yield seg return # generic list/tuple if isinstance(diar_result, (list, tuple)): for seg in _yield_from_track_dicts(diar_result): yield seg return console.print(f"❌ Ismeretlen diarization output: {type(diar_result)}") sys.exit(1) for speaker, start, end in extract_segments(diarization): speakers.add(speaker) speaker_segments.append({ "speaker": speaker, "start": start, "end": end }) with open("diar.json","w",encoding="utf-8") as f: json.dump(speaker_segments,f,ensure_ascii=False,indent=2) console.print(f"✔ Diarization kész, beszélők: {sorted(list(speakers))}\n") # --- 4. Speaker idő alapján speaker kitöltése a translated szegmensekhez --- console.print("🧠 [4/6] Speaker ID STT → speaker idő alapján…") def find_speaker_for_time(t): for s in speaker_segments: if s["start"] <= t <= s["end"]: return s["speaker"] return "UNKNOWN" aligned = [] for item in translated: spk = find_speaker_for_time(item["start"]) aligned.append({ "speaker": spk, "start": item["start"], "end": item["end"], "en": item["en"], "hu": item["hu"] }) with open("aligned_segments.json","w",encoding="utf-8") as f: json.dump(aligned,f,ensure_ascii=False,indent=2) console.print("✔ Speakerhez rendelt szegmensek mentve: aligned_segments.json\n") # --- 5. TTS: speakerenként külön magyar hang (edge-tts) --- console.print("🎙 [5/6] Magyar hang generálás speakerenként külön voice-al…") available_voices = [ "hu-HU-NoemiNeural", "hu-HU-TamasNeural", "hu-HU-LillaNeural", "hu-HU-ImreNeural", ] spk_list = sorted(list({s["speaker"] for s in aligned})) voice_map = {} for i, spk in enumerate(spk_list): voice_map[spk] = available_voices[i % len(available_voices)] console.print("🗣 Speaker → Voice kiosztás:") for k,v in voice_map.items(): console.print(f" {k} → {v}") os.makedirs("tts_segments", exist_ok=True) tts_meta = [] for i, seg in enumerate(aligned): spk = seg["speaker"] text = seg["hu"].strip() if not text: continue voice = voice_map.get(spk, available_voices[0]) out_path = os.path.join("tts_segments", f"tts_{i:04d}.wav") subprocess.run([ "edge-tts", "--voice", voice, "--text", text, "--write-media", out_path ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) if os.path.exists(out_path): tts_meta.append({"path": out_path, "start": seg["start"]}) console.print(f"\n✔ {len(tts_meta)} db TTS hang-szegmens elkészült\n") # --- 6. Magyar TTS sáv összeépítés + videóba muxolás --- console.print("🎚 [6/6] Magyar dub hangsáv létrehozása és visszaillesztése videóra…") if not tts_meta: console.print("❌ Nincs TTS, kilépek.") sys.exit(1) video_duration = aligned[-1]["end"] target_sr = 48000 samples = int((video_duration+1)*target_sr) mix = np.zeros(samples, dtype=np.float32) for tts in tts_meta: data_tts, sr_tts = sf.read(tts["path"]) if sr_tts != target_sr: res = torchaudio.transforms.Resample(sr_tts, target_sr) data_tts = res(torch.from_numpy(data_tts).float().unsqueeze(0)).squeeze(0).numpy() start_i = int(tts["start"]*target_sr) end_i = start_i + len(data_tts) if end_i > len(mix): data_tts = data_tts[:len(mix)-start_i] end_i = len(mix) mix[start_i:end_i] += data_tts mix = mix / (np.max(np.abs(mix))+1e-9) * 0.9 sf.write("dub_mix.wav", mix, target_sr) console.print("✔ Dub sáv mentve: dub_mix.wav") subprocess.run([ "ffmpeg","-y","-i",video_path, "-i","dub_mix.wav", "-map","0:v:0","-map","1:a:0", "-c:v","copy","-c:a","aac","-shortest", "dubbed_output.mp4" ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) console.print("\n✅ Magyar AI dub kész: dubbed_output.mp4")