123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- import json
- import glob
- import string
- import unicodedata
- from datetime import datetime
- import os
- import subprocess
- import threading
- from concurrent.futures import ThreadPoolExecutor
- findText="琪琪"
- replace_audio_file_name="resource/xiaodou002.mp3"
- def count_all_punctuation(text):
- chinese_punctuation = ",。!?《》【】、"
- count = 0
- for char in text:
- if char in string.punctuation or char in chinese_punctuation or unicodedata.category(char).startswith('P'):
- count += 1
- return count
- # 线程池任务执行函数
- def execCMD(command):
- print(command)
- try:
- # 执行命令并获取结果
- result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=True)
- if result.returncode == 0:
- print("命令执行成功!")
- else:
- print(f"命令执行失败: {result.stderr}")
- except Exception as e:
- print(f"执行命令时发生错误: {str(e)}")
- def mergeAudio(file_list_name):
- output_mp3_name=file_list_name.replace(".txt",".mp3")
- command=f"ffmpeg -f concat -safe 0 -i {file_list_name} -c -c:a libmp3lame -b:a 128k copy {output_mp3_name}"
- print(command)
- try:
- # 执行命令并获取结果
- result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=True)
- if result.returncode == 0:
- print("音频合并成功!")
- else:
- print(f"音频合并执行失败: {result.stderr}")
- except Exception as e:
- print(f"执行命令时发生错误: {str(e)}")
- def mergeAudio002(audios,mergeAudioName):
- # command=f'ffmpeg -i output/03_000000.mp3 -i resource/xiaodou002.mp3 -i output/03_000010.mp3 -i resource/xiaodou002.mp3 -filter_complex "[0][1][2][3]concat=n=4:v=0:a=1" -y merge/03_0000.mp3'
- command=f'ffmpeg'
- for audio in audios:
- command+=" -i "+audio
- command+=" -filter_complex "
- command+='"'
- for x in range(len(audios)):
- command+=f"[{str(x)}]"
- command+=f"concat=n={str(len(audios))}:v=0:a=1"
- command+='"'
- command+=f" -y {mergeAudioName}"
- try:
- # 执行命令并获取结果
- result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=True)
- if result.returncode == 0:
- print("音频合并成功!")
- else:
- print(f"音频合并执行失败: {result.stderr}")
- except Exception as e:
- print(f"执行命令时发生错误: {str(e)}")
- def merge_audio_video(video_file,audio_file,output_file):
- command=f"ffmpeg -i {video_file} -i {audio_file} -map 0:v:0 -map 1:a:0 -c:v copy -c:a aac -strict experimental {output_file}"
- try:
- # 执行命令并获取结果
- result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=True)
- if result.returncode == 0:
- print("音频替换成功!")
- else:
- print(f"音频替换执行失败: {result.stderr}")
- except Exception as e:
- print(f"执行命令时发生错误: {str(e)}")
- def replace_audio(splits):
- if len(splits)==1:
- return [], None
- file_name=os.path.basename(splits[0]['filePath']).replace(".mp3","")
- audios=[]
- for index,x in enumerate(splits):
- if 'endTime' in x:
- cmd=f"ffmpeg -i {x['filePath']} -ss {x['startTime']} -to {x['endTime']} -ac 2 -ar 44100 -ab 128k -y output/{file_name}0000{index}.mp3"
- audios.append(f"output/{file_name}0000{index}.mp3")
- audios.append(replace_audio_file_name)
- else:
- cmd=f"ffmpeg -i {x['filePath']} -ss {x['startTime']} -ac 2 -ar 44100 -ab 128k -y output/{file_name}0000{index}.mp3"
- audios.append(f"output/{file_name}0000{index}.mp3")
- execCMD(cmd)
- return audios,file_name
- def timeConversion(time):
- seconds = time // 1000
- milliseconds_remaining = time % 1000
- minutes = seconds // 60
- seconds_remaining = seconds % 60
- time_str = "{:02}:{:02}:{:02}.{:03}".format(minutes // 60, minutes % 60, seconds_remaining, milliseconds_remaining)
- return time_str
- def exJson(path):
- with open(path, 'r', encoding='utf-8') as file:
- # print(os.path.dirname(path).replace("subtitle","input"))
- splits=[]
- times=[0]
- file_path = os.path.dirname(path).replace("subtitle","input")+"/"+os.path.basename(path).replace("json","mp3")
- content = file.read().strip()
- jsonData = json.loads(content)
- respData = jsonData["resp"]
- for utterance in respData["utterances"]:
- text=utterance["text"]
- if findText in text:
- findIndex=text.find(findText)
- words=utterance["words"]
- findIndex=findIndex-count_all_punctuation(text[:findIndex])
- startIndex=findIndex
- endIndex=findIndex+len(findText)
- # print("startTime")
- # print(words[startIndex]["start_time"])
- # print(timeConversion(words[startIndex]["start_time"]))
- # print("endTime")
- # print(words[endIndex]["end_time"])
- # print("--------------------------------")
- times.append(words[startIndex]["start_time"])
- times.append(words[endIndex]["end_time"])
- # splits.append({'filePath':file_path,'startTime':timeConversion(words[startIndex]["start_time"]),'endTime':timeConversion(words[startIndex]["end_time"])})
- # replace_audio(splits)
- for i in range(0, len(times), 2):
- if i+1==len(times):
- splits.append({'filePath':file_path,'startTime':timeConversion(times[i])})
- else:
- splits.append({'filePath':file_path,'startTime':timeConversion(times[i]),'endTime':timeConversion(times[i+1])})
- audios,file_name=replace_audio(splits)
- if len(audios)==0:
- return
- mergeAudio002(audios,f"merge/{file_name}0000.mp3")
- merge_audio_video(f"input/{file_name}.mp4",f"merge/{file_name}0000.mp3",f"output_video/{file_name}.mp4")
- if __name__ == '__main__':
- for x in glob.glob(r"D:\replace_audio_script\merge\*"):
- os.remove(x)
- for x in glob.glob(r"D:\replace_audio_script\output\*"):
- os.remove(x)
- for x in glob.glob(r"D:\replace_audio_script\output_video\*"):
- os.remove(x)
- for x in glob.glob(r"D:\replace_audio_script\subtitle\*.json"):
- exJson(x)
|