307 lines
9.6 KiB
Python
307 lines
9.6 KiB
Python
#!/usr/bin/env python3
|
|
import os
|
|
import sys
|
|
import json
|
|
import subprocess
|
|
import numpy as np
|
|
import soundfile as sf
|
|
import torchaudio
|
|
import torch
|
|
import whisper
|
|
from deep_translator import GoogleTranslator
|
|
from pyannote.audio import Pipeline
|
|
from rich.console import Console
|
|
from rich.progress import Progress, BarColumn, TextColumn, TimeRemainingColumn, TimeElapsedColumn
|
|
|
|
console = Console()
|
|
|
|
# --- Safe-global engedélyezés TorchVersion számára (checkpoint hack) ---
|
|
from torch.serialization import add_safe_globals
|
|
from torch.torch_version import TorchVersion
|
|
from pyannote.audio.core.task import Specifications, Problem, Resolution
|
|
import dataclasses
|
|
|
|
# allowloading checkpoints that store these classes (PyTorch 2.6+ weights_only change)
|
|
add_safe_globals([TorchVersion, Specifications, Problem, Resolution])
|
|
|
|
# --- FFmpeg és Whisper a saját rendszered pipeline-jához, HF Token kötelező ---
|
|
if len(sys.argv) < 2:
|
|
console.print("❌ Használat: python3 dub_pipeline.py <video.mp4>")
|
|
sys.exit(1)
|
|
|
|
video_path = sys.argv[1]
|
|
if not os.path.exists(video_path):
|
|
console.print("❌ A videó nem található!")
|
|
sys.exit(1)
|
|
|
|
token = os.environ.get("HF_TOKEN")
|
|
if not token:
|
|
console.print("❌ HF_TOKEN NINCS beállítva!")
|
|
sys.exit(1)
|
|
|
|
AUDIO_WAV = "audio.wav"
|
|
|
|
# --- 1. Hang kinyerés FFmpeg-gel mono 16kHz WAV-ba ---
|
|
console.print("\n🎬 [1/6] Hang kinyerés videóból (mono, 16kHz WAV)…")
|
|
|
|
if os.path.exists(AUDIO_WAV):
|
|
os.remove(AUDIO_WAV)
|
|
|
|
with Progress(
|
|
TextColumn("{task.description}"),
|
|
BarColumn(),
|
|
TimeElapsedColumn(),
|
|
TimeRemainingColumn(),
|
|
) as progress:
|
|
progress.add_task("FFmpeg extract fut…", total=None)
|
|
subprocess.run([
|
|
"ffmpeg","-y","-i",video_path,
|
|
"-map","0:a:0","-ac","1","-ar","16000","-c:a","pcm_s16le",AUDIO_WAV
|
|
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
|
if not os.path.exists(AUDIO_WAV):
|
|
console.print("❌ Hang extract sikertelen!")
|
|
sys.exit(1)
|
|
|
|
console.print("✔ Hang kinyerve: audio.wav\n")
|
|
|
|
|
|
# --- 2. Whisper STT (angol beszéd felismerés) ---
|
|
console.print("🎤 [2/6] Speech-to-Text (Whisper, EN felismerés)…")
|
|
|
|
model = whisper.load_model("medium") # stabil, többnyelvű
|
|
with Progress(TextColumn("Whisper STT…"), BarColumn(), TimeElapsedColumn(), TimeRemainingColumn()) as p:
|
|
p.add_task("STT fut…", total=None)
|
|
result = model.transcribe(AUDIO_WAV, language="en")
|
|
|
|
segments = result.get("segments", [])
|
|
console.print(f"✔ STT kész, szegmensek száma: {len(segments)}")
|
|
|
|
# --- 2.5 Fordítás angol → magyarra ---
|
|
console.print("\n🌍 Angol → Magyar fordítás…")
|
|
translator = GoogleTranslator(source="en", target="hu")
|
|
|
|
translated = []
|
|
for seg in segments:
|
|
text_en = seg["text"].strip()
|
|
text_hu = translator.translate(text_en) if text_en else ""
|
|
translated.append({
|
|
"speaker": "UNKNOWN", # később diarization fogja kitölteni
|
|
"start": seg["start"],
|
|
"end": seg["end"],
|
|
"en": text_en,
|
|
"hu": text_hu
|
|
})
|
|
|
|
with open("segments_translated.json","w",encoding="utf-8") as f:
|
|
json.dump(translated,f,ensure_ascii=False,indent=2)
|
|
|
|
console.print("✔ Fordított szegmensek mentve: segments_translated.json\n")
|
|
|
|
|
|
# --- 3. Beszélők felismerése (Pyannote diarization) ---
|
|
console.print("👥 [3/6] Beszélők diarization (pyannote/speaker-diarization-3.1)…")
|
|
|
|
waveform, sample_rate = torchaudio.load(AUDIO_WAV)
|
|
audio_dict = {"waveform": waveform, "sample_rate": sample_rate}
|
|
|
|
diar_pipeline = Pipeline.from_pretrained(
|
|
"pyannote/speaker-diarization-3.1",
|
|
token=token
|
|
)
|
|
|
|
with Progress(TextColumn("Diarization fut…"), BarColumn(), TimeElapsedColumn(), TimeRemainingColumn()) as p:
|
|
p.add_task("Beszélők szétválasztása…", total=None)
|
|
diarization = diar_pipeline(audio_dict)
|
|
|
|
# speaker szegmensek bejárása
|
|
speaker_segments = []
|
|
speakers = set()
|
|
|
|
def _yield_from_annotation(annotation_obj):
|
|
if annotation_obj is None:
|
|
return
|
|
if hasattr(annotation_obj, "itertracks"):
|
|
for segment, _, speaker in annotation_obj.itertracks(yield_label=True):
|
|
yield speaker, float(segment.start), float(segment.end)
|
|
|
|
def _yield_from_track_dicts(track_list):
|
|
if not track_list:
|
|
return
|
|
for item in track_list:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
speaker = item.get("speaker") or item.get("label") or "UNKNOWN"
|
|
start = float(item.get("start", 0.0))
|
|
end = float(item.get("end", start))
|
|
yield speaker, start, end
|
|
|
|
def extract_segments(diar_result):
|
|
# pyannote 3.x returns Annotation with itertracks; 4.x returns DiarizeOutput/dataclass
|
|
if hasattr(diar_result, "itertracks"):
|
|
for segment, _, speaker in diar_result.itertracks(yield_label=True):
|
|
yield speaker, float(segment.start), float(segment.end)
|
|
return
|
|
|
|
# dataclass (e.g., DiarizeOutput) -> inspect fields
|
|
if dataclasses.is_dataclass(diar_result):
|
|
data_dict = dataclasses.asdict(diar_result)
|
|
yield from extract_segments(data_dict)
|
|
return
|
|
|
|
# dict-like
|
|
if isinstance(diar_result, dict):
|
|
annotation = diar_result.get("annotation")
|
|
if annotation is not None:
|
|
yield from _yield_from_annotation(annotation)
|
|
return
|
|
tracks = diar_result.get("tracks")
|
|
yielded = False
|
|
for seg in _yield_from_track_dicts(tracks):
|
|
yielded = True
|
|
yield seg
|
|
if yielded:
|
|
return
|
|
# try any nested value that is annotation-like
|
|
for val in diar_result.values():
|
|
if hasattr(val, "itertracks"):
|
|
yield from _yield_from_annotation(val)
|
|
return
|
|
# as last resort, try list/tuple of dicts
|
|
if isinstance(diar_result, (list, tuple)):
|
|
for seg in _yield_from_track_dicts(diar_result):
|
|
yield seg
|
|
return
|
|
|
|
# generic list/tuple
|
|
if isinstance(diar_result, (list, tuple)):
|
|
for seg in _yield_from_track_dicts(diar_result):
|
|
yield seg
|
|
return
|
|
|
|
console.print(f"❌ Ismeretlen diarization output: {type(diar_result)}")
|
|
sys.exit(1)
|
|
|
|
for speaker, start, end in extract_segments(diarization):
|
|
speakers.add(speaker)
|
|
speaker_segments.append({
|
|
"speaker": speaker,
|
|
"start": start,
|
|
"end": end
|
|
})
|
|
|
|
with open("diar.json","w",encoding="utf-8") as f:
|
|
json.dump(speaker_segments,f,ensure_ascii=False,indent=2)
|
|
|
|
console.print(f"✔ Diarization kész, beszélők: {sorted(list(speakers))}\n")
|
|
|
|
# --- 4. Speaker idő alapján speaker kitöltése a translated szegmensekhez ---
|
|
console.print("🧠 [4/6] Speaker ID STT → speaker idő alapján…")
|
|
|
|
def find_speaker_for_time(t):
|
|
for s in speaker_segments:
|
|
if s["start"] <= t <= s["end"]:
|
|
return s["speaker"]
|
|
return "UNKNOWN"
|
|
|
|
aligned = []
|
|
for item in translated:
|
|
spk = find_speaker_for_time(item["start"])
|
|
aligned.append({
|
|
"speaker": spk,
|
|
"start": item["start"],
|
|
"end": item["end"],
|
|
"en": item["en"],
|
|
"hu": item["hu"]
|
|
})
|
|
|
|
with open("aligned_segments.json","w",encoding="utf-8") as f:
|
|
json.dump(aligned,f,ensure_ascii=False,indent=2)
|
|
|
|
console.print("✔ Speakerhez rendelt szegmensek mentve: aligned_segments.json\n")
|
|
|
|
|
|
# --- 5. TTS: speakerenként külön magyar hang (edge-tts) ---
|
|
console.print("🎙 [5/6] Magyar hang generálás speakerenként külön voice-al…")
|
|
|
|
available_voices = [
|
|
"hu-HU-NoemiNeural",
|
|
"hu-HU-TamasNeural",
|
|
"hu-HU-LillaNeural",
|
|
"hu-HU-ImreNeural",
|
|
]
|
|
|
|
spk_list = sorted(list({s["speaker"] for s in aligned}))
|
|
voice_map = {}
|
|
|
|
for i, spk in enumerate(spk_list):
|
|
voice_map[spk] = available_voices[i % len(available_voices)]
|
|
|
|
console.print("🗣 Speaker → Voice kiosztás:")
|
|
for k,v in voice_map.items():
|
|
console.print(f" {k} → {v}")
|
|
|
|
os.makedirs("tts_segments", exist_ok=True)
|
|
tts_meta = []
|
|
|
|
for i, seg in enumerate(aligned):
|
|
spk = seg["speaker"]
|
|
text = seg["hu"].strip()
|
|
if not text:
|
|
continue
|
|
|
|
voice = voice_map.get(spk, available_voices[0])
|
|
out_path = os.path.join("tts_segments", f"tts_{i:04d}.wav")
|
|
|
|
subprocess.run([
|
|
"edge-tts", "--voice", voice,
|
|
"--text", text,
|
|
"--write-media", out_path
|
|
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
|
if os.path.exists(out_path):
|
|
tts_meta.append({"path": out_path, "start": seg["start"]})
|
|
|
|
console.print(f"\n✔ {len(tts_meta)} db TTS hang-szegmens elkészült\n")
|
|
|
|
|
|
# --- 6. Magyar TTS sáv összeépítés + videóba muxolás ---
|
|
console.print("🎚 [6/6] Magyar dub hangsáv létrehozása és visszaillesztése videóra…")
|
|
|
|
if not tts_meta:
|
|
console.print("❌ Nincs TTS, kilépek.")
|
|
sys.exit(1)
|
|
|
|
video_duration = aligned[-1]["end"]
|
|
target_sr = 48000
|
|
samples = int((video_duration+1)*target_sr)
|
|
mix = np.zeros(samples, dtype=np.float32)
|
|
|
|
for tts in tts_meta:
|
|
data_tts, sr_tts = sf.read(tts["path"])
|
|
if sr_tts != target_sr:
|
|
res = torchaudio.transforms.Resample(sr_tts, target_sr)
|
|
data_tts = res(torch.from_numpy(data_tts).float().unsqueeze(0)).squeeze(0).numpy()
|
|
|
|
start_i = int(tts["start"]*target_sr)
|
|
end_i = start_i + len(data_tts)
|
|
if end_i > len(mix):
|
|
data_tts = data_tts[:len(mix)-start_i]
|
|
end_i = len(mix)
|
|
|
|
mix[start_i:end_i] += data_tts
|
|
|
|
mix = mix / (np.max(np.abs(mix))+1e-9) * 0.9
|
|
sf.write("dub_mix.wav", mix, target_sr)
|
|
console.print("✔ Dub sáv mentve: dub_mix.wav")
|
|
|
|
subprocess.run([
|
|
"ffmpeg","-y","-i",video_path,
|
|
"-i","dub_mix.wav",
|
|
"-map","0:v:0","-map","1:a:0",
|
|
"-c:v","copy","-c:a","aac","-shortest",
|
|
"dubbed_output.mp4"
|
|
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
|
console.print("\n✅ Magyar AI dub kész: dubbed_output.mp4")
|