Skip to content

Commit

Permalink
Add stable-ts to requirements.txt and update video.json structure
Browse files Browse the repository at this point in the history
Split main.py into multiple files
  • Loading branch information
MatteoFasulo committed Dec 29, 2023
1 parent 724cfdb commit b344d57
Show file tree
Hide file tree
Showing 14 changed files with 534 additions and 587 deletions.
595 changes: 29 additions & 566 deletions main.py

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ torchaudio==2.0.1+cu117
torchvision==0.15.1+cu117
mkdocs-material
openai-whisper
stable-ts
tiktok-uploader
Empty file added src/__init__.py
Empty file.
77 changes: 77 additions & 0 deletions src/arg_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import argparse
import sys

# voice_manager.py
from src.voice_manager import VoicesManager

import msg


async def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--model", default="small", help="Model to use",
choices=["tiny", "base", "small", "medium", "large"], type=str)
parser.add_argument("--non_english", action='store_true',
help="Don't use the english model.")
parser.add_argument("--url", metavar='U', default="https://www.youtube.com/watch?v=intRX7BRA90",
help="Youtube URL to download as background video.", type=str)
parser.add_argument("--tts", default="en-US-ChristopherNeural",
help="Voice to use for TTS", type=str)
parser.add_argument(
"--list-voices", help="Use `edge-tts --list-voices` to list all voices", action='help')
parser.add_argument("--random_voice", action='store_true',
help="Random voice for TTS", default=False)
parser.add_argument("--gender", choices=["Male", "Female"],
help="Gender of the random TTS voice", type=str)
parser.add_argument(
"--language", help="Language of the random TTS voice for example: en-US", type=str)
parser.add_argument("--sub_format",
help="Subtitle format", choices=["u", "i", "b"], default="b", type=str)
parser.add_argument("--font_color", help="Subtitle font color in hex format: #FFF000",
default="#FFF000", type=str)
parser.add_argument("--upload_tiktok", help="Upload to TikTok after creating the video",
action='store_true', default=False)
parser.add_argument("-v", "--verbose", action='store_true',
help="Verbose")
args = parser.parse_args()

if args.random_voice: # Random voice
args.tts = None
if not args.gender:
print(
f"{msg.ERROR}When using --random_voice, please specify both --gender and --language arguments.")
sys.exit(1)

elif not args.language:
print(
f"{msg.ERROR}When using --random_voice, please specify both --gender and --language arguments.")
sys.exit(1)

elif args.gender and args.language:
# Check if voice is valid
voices_manager_obj = await VoicesManager().create()
voices = await VoicesManager().find(voices_manager_obj, args.gender, args.language)
args.tts = voices['Name']

# Check if language is english
if not str(args.language).startswith('en'):
args.non_english = True

else:
# Check if voice is valid
voices = await VoicesManager().create()
args.language = '-'.join(i for i in args.tts.split('-')[0:2])
voices = voices.find(Locale=args.language)
if len(voices) == 0:
# Voice not found
print(
f"{msg.ERROR}Specified TTS voice not found. Use `edge-tts --list-voices` to list all voices.")
sys.exit(1)

# Extract language from TTS voice
if args.tts:
lang_prefix = args.tts.split('-')[0]
if not lang_prefix.startswith('en'):
args.non_english = True

return args
38 changes: 38 additions & 0 deletions src/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os
import datetime
import logging
from pathlib import Path


class KeepDir:
def __init__(self):
self.original_dir = os.getcwd()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
os.chdir(self.original_dir)

def chdir(self, path):
os.chdir(path)


def setup_logger():
HOME = Path.cwd()
log_directory = HOME / 'log'
if not log_directory.exists():
log_directory.mkdir()

with KeepDir() as keep_dir:
keep_dir.chdir(log_directory)
log_filename = f'{datetime.date.today()}.log'
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_filename),
]
)
logger = logging.getLogger(__name__)
return logger
46 changes: 46 additions & 0 deletions src/subtitle_creator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os
from pathlib import Path
import stable_whisper as whisper
import torch


def srt_create(model, path: str, series: str, part: int, text: str, filename: str) -> bool:
series = series.replace(' ', '_')

srt_path = f"{path}{os.sep}{series}{os.sep}"
srt_filename = f"{srt_path}{series}_{part}.srt"
absolute_srt_path = Path(srt_filename).absolute()

transcribe = model.transcribe(
filename, regroup=True, fp16=torch.cuda.is_available())
transcribe.split_by_gap(0.5).split_by_length(
38).merge_by_gap(0.15, max_words=2)
transcribe.to_srt_vtt(str(absolute_srt_path), word_level=True)

return srt_filename


def highlight_words(srt_file: str, subtitle_format: str = "b", font_color: str = "#FFF000") -> bool:
subtitle_format = subtitle_format.lower()

if not font_color.startswith('#'):
print(
f"Invalid font color. Using default color: #FFF000")
font_color = "#FFF000"
else:
font_color = font_color.upper()

with open(srt_file, 'r', encoding='UTF-8') as f:
content = f.read()

content = content.replace(
'<u>', f'<font color={font_color}><{subtitle_format}>')
content = content.replace('</u>', f'</{subtitle_format}></font>')

with open(srt_file, 'w', encoding='UTF-8') as f:
f.write(content)

print(
f"Subtitle file formatted successfully")

return True
8 changes: 8 additions & 0 deletions src/text_to_speech.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import edge_tts


async def tts(final_text: str, voice: str = "en-US-ChristopherNeural", stdout: bool = False, outfile: str = "tts.mp3", args=None) -> bool:
communicate = edge_tts.Communicate(final_text, voice)
if not stdout:
await communicate.save(outfile)
return True
30 changes: 30 additions & 0 deletions src/tiktok.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import logging
import os

from tiktok_uploader.upload import upload_video

logger = logging.getLogger(__name__)


def upload_tiktok(file, title: str, tags: list, headless: bool = False):
if not os.path.isfile('cookies.txt'):
logger.error('Cookie file not found')

else:
logger.info('Cookie file found')

if len(tags) > 0:
tags = ' '.join([f"#{tag}" for tag in tags])
description = f"{title} {tags}"
else:
description = title

try:
upload_video(file, description=description, cookies='cookies.txt',
comment=True, stitch=False, duet=False, headless=headless)

except Exception as e:
logger.exception(e)
return False

return True
95 changes: 95 additions & 0 deletions src/video_creator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import json
import subprocess
from pathlib import Path

import stable_whisper as whisper
from .logger import setup_logger
from .subtitle_creator import srt_create, highlight_words
from .text_to_speech import tts
from .tiktok import upload_tiktok
from .video_prepare import prepare_background
from utils import *

HOME = Path.cwd()
logger = setup_logger()
video_json_path = HOME / 'video.json'
jsonData = json.loads(video_json_path.read_text(encoding='utf-8'))
media_folder = HOME / 'media'


class VideoCreator:
def __init__(self, args):
self.args = args

self.series = jsonData.get('series', '')
self.part = jsonData.get('part', '')
self.text = jsonData.get('text', '')
self.tags = jsonData.get('tags', list())
self.outro = jsonData.get('outro', '')
self.path = Path(media_folder).absolute()

def download_video(self, folder: str = 'background'):
directory = HOME / folder
if not directory.exists():
directory.mkdir()

with KeepDir() as keep_dir:
keep_dir.chdir(folder)
subprocess.run(['yt-dlp', '-f bestvideo[ext=mp4]+bestaudio[ext=m4a]',
'--restrict-filenames', self.args.url], check=True)
console.log(
f"{msg.OK}Video downloaded from {self.args.url} to {folder}")
logger.info(f"Video downloaded from {self.args.url} to {folder}")

def load_model(self):
model = self.args.model
if self.args.model != "large" and not self.args.non_english:
model = self.args.model + ".en"
whisper_model = whisper.load_model(model)

self.model = whisper_model
return whisper_model

def create_text(self):
req_text = f"{self.series} - {self.part}.\n{self.text}\n{self.outro}"
series = self.series.replace(' ', '_')
filename = f"{self.path}{os.sep}{series}{os.sep}{series}_{self.part}.mp3"

Path(f"{self.path}{os.sep}{series}").mkdir(parents=True, exist_ok=True)

self.req_text = req_text
self.mp3_file = filename
return req_text, filename

async def text_to_speech(self):
await tts(self.req_text, outfile=self.mp3_file, voice=self.args.tts, args=self.args)

def generate_transcription(self):
srt_filename = srt_create(
self.model, self.path, self.series, self.part, self.text, self.mp3_file)
srt_filename = Path(srt_filename).absolute()

self.srt_file = srt_filename

highlight_words(self.srt_file, subtitle_format=self.args.sub_format,
font_color=self.args.font_color)
return srt_filename

def select_background(self):
background_mp4 = random_background()

self.mp4_backgroung = background_mp4
return background_mp4

def integrate_subtitles(self):
final_video = prepare_background(
self.mp4_backgroung, filename_mp3=self.mp3_file, filename_srt=self.srt_file, verbose=self.args.verbose)
final_video = Path(final_video).absolute()

self.mp4_final_video = final_video
return final_video

def upload_to_tiktok(self):
uploaded = upload_tiktok(str(
self.mp4_final_video), title=f"{self.series} - {self.part}", tags=self.tags, headless=not self.args.verbose)
return uploaded
27 changes: 27 additions & 0 deletions src/video_downloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import os
import subprocess
from pathlib import Path

from msg import msg
from utils import KeepDir

HOME = Path.cwd()


def download_video(url: str, folder: str = 'background'):
"""
Downloads a video from the given URL and saves it to the specified folder.
Args:
url (str): The URL of the video to download.
folder (str, optional): The name of the folder to save the video in. Defaults to 'background'.
"""
directory = HOME / folder
if not directory.exists():
directory.mkdir()

with KeepDir() as keep_dir:
keep_dir.chdir(folder)
subprocess.run(['yt-dlp', '-f bestvideo[ext=mp4]+bestaudio[ext=m4a]',
'--restrict-filenames', url], check=True)
print(f"{msg.OK}Background video downloaded successfully")
57 changes: 57 additions & 0 deletions src/video_prepare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import multiprocessing
import os
import subprocess
import random

from utils import *

HOME = Path.cwd()


def prepare_background(background_mp4: str, filename_mp3: str, filename_srt: str, verbose: bool = False) -> str:
video_duration = int(round(get_info(background_mp4).get('duration'), 0))
audio_info = get_info(filename_mp3)
audio_duration = int(round(audio_info.get('duration'), 0))
ss = random.randint(0, (video_duration-audio_duration))
audio_duration = convert_time(audio_info.get('duration'))
if ss < 0:
ss = 0

srt_filename = filename_srt.name
srt_path = filename_srt.parent.absolute()

directory = HOME / 'output'
if not directory.exists():
directory.mkdir()

outfile = f"{HOME}{os.sep}output{os.sep}output_{ss}.mp4"

if verbose:
rich_print(
f"{filename_srt = }\n{background_mp4 = }\n{filename_mp3 = }\n", style='bold green')

args = [
"ffmpeg",
"-ss", str(ss),
"-t", str(audio_duration),
"-i", background_mp4,
"-i", filename_mp3,
"-map", "0:v",
"-map", "1:a",
"-filter:v",
f"crop=ih/16*9:ih, scale=w=1080:h=1920:flags=bicubic, gblur=sigma=2, subtitles={srt_filename}:force_style=',Alignment=8,BorderStyle=7,Outline=3,Shadow=5,Blur=15,Fontsize=15,MarginL=45,MarginR=55,FontName=Lexend Bold'",
"-c:v", "libx264", "-preset", "5",
"-b:v", "5M",
"-c:a", "aac", "-ac", "1",
"-b:a", "96K",
f"{outfile}", "-y",
"-threads", f"{multiprocessing.cpu_count()//2}"]

if verbose:
rich_print('[i] FFMPEG Command:\n'+' '.join(args)+'\n', style='yellow')

with KeepDir() as keep_dir:
keep_dir.chdir(srt_path)
subprocess.run(args, check=True)

return outfile
Loading

0 comments on commit b344d57

Please sign in to comment.