from __future__ import unicode_literals
from datetime import datetime
from pydub import AudioSegment
from pydub.generators import Sine
import random
import requests
from urllib.request import Request, urlopen
import urllib.error
import string
import json
import urllib.parse
import os
os.environ["PATH"] += os.pathsep + "/opt/homebrew/bin"
import time
import yt_dlp
from tqdm import tqdm
def check_youtube_quota(apiKey):
try:
# Endpoint pour une requête simple pour tester le quota
url = f"https://www.googleapis.com/youtube/v3/search?key={apiKey}&part=snippet&type=video&q=test&maxResults=1"
# Faire une requête
response = requests.get(url)
# Si la requête est réussie
if response.status_code == 200:
print("L'API fonctionne correctement.")
print("Attention : Vous ne pouvez pas connaître précisément le quota restant via cette API.")
return True
else:
print(f"Erreur : {response.status_code} - {response.json().get('error', {}).get('message', 'Message inconnu')}")
return False
except requests.exceptions.RequestException as e:
print(f"Erreur de connexion à l'API : {e}")
return False
# delay granulaire
def apply_granular_delay(segment, delay_time=random.uniform(10, 200), feedback=random.uniform(0.1, 5), num_repeats=random.uniform(5, 10)):
delay_time = int(delay_time) # Convertir en entier
num_repeats = int(num_repeats) # Convertir en entier
delayed_segment = AudioSegment.silent(duration=len(segment))
for i in range(num_repeats):
delay_start = int(i * delay_time)
faded_segment = segment - (i * feedback * 10) # Atténuation progressive
delayed_segment = delayed_segment.overlay(faded_segment, position=delay_start)
return delayed_segment
# reverb
def apply_reverb(audio, decay=4000):
return audio # Réverbération simulée
# glitch
def process_glitch(audio, start_ms, segment_duration):
try:
if start_ms < 0 or start_ms + segment_duration > len(audio):
return None
glitch_segment = audio[start_ms:start_ms + segment_duration]
glitch_type = random.choices(
['repeat', 'invert', 'speedup', 'slowdown'],
weights=[10, 30, 30, 30],
k=1
)[0]
if glitch_type == 'repeat':
repeat_count = random.randint(1, 2)
return glitch_segment * repeat_count
elif glitch_type == 'invert':
return glitch_segment.reverse()
elif glitch_type == 'speedup':
speed_factor = random.uniform(1.2, 5)
min_duration = 150 / speed_factor
if segment_duration >= min_duration:
return glitch_segment.speedup(playback_speed=speed_factor)
else:
return glitch_segment
elif glitch_type == 'slowdown':
slow_factor = random.uniform(0.2, 0.9)
slowed_segment = glitch_segment._spawn(
glitch_segment.raw_data,
overrides={"frame_rate": int(glitch_segment.frame_rate * slow_factor)}
)
return slowed_segment.set_frame_rate(glitch_segment.frame_rate)
return glitch_segment
except Exception as e:
print(f"Error processing segment: {e}")
return None
# compression
def apply_compressor(audio, makeup_gain=20):
compressed_audio = audio.compress_dynamic_range(
threshold=-30,
ratio=15.0,
attack=5,
release=50
)
return compressed_audio + makeup_gain
# multipiste
def apply_extreme_glitch(audio, num_tracks=4):
audio_length = len(audio)
track_length = audio_length // num_tracks
tracks = []
# traitement multipiste
for track_index in range(num_tracks):
start_ms = track_index * track_length
end_ms = (track_index + 1) * track_length if track_index < num_tracks - 1 else audio_length
track = audio[start_ms:end_ms]
track = apply_effects(track) # effets sur chaque piste
tracks.append(track)
# fusion des pistes
final_audio = tracks[0]
for track in tracks[1:]:
final_audio = final_audio.overlay(track)
# limiter
final_audio = apply_compressor(final_audio)
return final_audio
# effets sur piste
def apply_effects(track):
glitch_count = random.randint(500, 1000) # Nombre de glitchs
output_audio = AudioSegment.silent(duration=0)
audio_length = len(track)
for _ in tqdm(range(glitch_count), desc="Processing glitches"):
start_ms = random.randint(0, max(0, audio_length - 500))
segment_duration = random.randint(100, 800)
segment = process_glitch(track, start_ms, segment_duration)
if segment:
choice = random.choices(
['clean', 'glitch', 'delay', 'reverb'],
weights=[10, 70, 10, 10],
k=1
)[0]
if choice == 'clean':
segment = track[start_ms:start_ms + segment_duration]
elif choice == 'delay':
segment = apply_granular_delay(segment)
elif choice == 'reverb':
segment = apply_reverb(segment, decay=random.randint(6000, 15000))
pan_value = random.uniform(-1.0, 1.0)
segment = segment.pan(pan_value)
output_audio += segment
return output_audio
# youtube downloader
def Main():
apiKey = open('apikey.txt').read().strip()
# Vérification du quota
print("API CHeck...")
if not check_youtube_quota(apiKey):
print("Problème d'API > vérifier le quota")
return # Arrêter le programme si l'API ne fonctionne pas
#keyword select
print('comment chercher les videos?')
print('1. mots anglais')
print('2. mots turques')
print('3. mots allemands')
print('4. mots japonais')
print('5. tous les languages cités ci-dessus')
print('6. cractères aléatoires')
print('7. tapez votre propre recherche')
while True:
queryChoice = input('choisir: ')
if queryChoice == "1":
keyword = "English"
break
elif queryChoice == "2":
keyword = "Turkish"
break
elif queryChoice == "3":
keyword = "German"
break
elif queryChoice == "4":
keyword = "Japanese"
break
elif queryChoice == "5":
keyword = "Random"
break
elif queryChoice == "6":
keyword = "Random letters + digits"
break
elif queryChoice == "7":
keyword = input("rechercher: ")
break
else:
print('chosissez une méthode de recherche')
print("recherche: {}".format(keyword))
#videoAmount select
print("----------------------------")
print('combien de videos? max: 200')
while True:
userChoice = int(input("nombre: "))
if userChoice in range(1,201):
print('{} videos then!'.format(userChoice))
videoAmount = userChoice
break
else:
print("Please write something between 1 and 200.")
print('nombre de videos: {}'.format(videoAmount))
print("----------------------------")
#videoDuration select
print('durée des vidéos?')
print('1. courte')
print('2. moyenne')
print('3. longue')
print('4. aléatoire')
while True:
userChoice = int(input('durée: '))
if userChoice == 1:
videoDuration = "short"
break
elif userChoice == 2:
videoDuration = "medium"
break
elif userChoice == 3:
videoDuration = "long"
break
elif userChoice == 4:
videoDuration = "any"
break
else:
print('choisissez une durée pour les videos')
#Order by
print("durée des videos {}".format(videoDuration))
print("----------------------------")
print("méthode de selection des videos")
print("1. pertinence")
print("2. Date")
print("3. note")
print("4. titre")
print("5. vues")
while True:
userChoice = input("choisir: ")
if userChoice == "1":
orderBy = "relevance"
break
elif userChoice == "2":
orderBy = "date"
break
elif userChoice == "3":
orderBy = "rating"
break
elif userChoice == "4":
orderBy = "title"
break
elif userChoice == "5":
orderBy = "viewCount"
else:
print("I don't have to tell you what to do...")
print("sélection des videos {}".format(orderBy))
print("----------------------------")
print("langue/recherche: {}".format(keyword))
print("nombre de videos: {}".format(videoAmount))
print("durée des videos: {}".format(videoDuration))
print("sélection des videos: {}".format(orderBy))
print('ok? Y/N')
print("----------------------------")
yesorno = input("Y ou N: ").lower()
if yesorno.startswith("y"):
ytfetch(queryChoice,videoAmount,videoDuration,keyword,orderBy)
else:
Main()
def Query(queryChoice):
if queryChoice == "1":
return random.choice(open('languages/english.txt').read().splitlines())
elif queryChoice == "2":
return random.choice(open('languages/turkish.txt').read().splitlines())
elif queryChoice == "3":
return random.choice(open('languages/german.txt').read().splitlines())
elif queryChoice == "4":
return random.choice(open('languages/japanese.txt').read().splitlines())
elif queryChoice == "5":
_files = os.listdir('languages/')
number = random.randint(0, len(_files) - 1)
file_ = _files[number]
query = random.choice(open('languages/{}'.format(file_)).read().splitlines())
return query
elif queryChoice == "6":
size = random.randint(3,5)
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(size))
# recherche avec API
def ytfetch(queryChoice, videoAmount, videoDuration, keyword, orderBy):
apiKey = open('apikey.txt').read().strip()
iteration = 1
dumpvideos = {}
videolist = []
print("un nom pour le fichier json?")
jsonQ = input("Y ou N: ").lower()
jsonname = input("nom du fichier json: ") if jsonQ.startswith("y") else datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
while iteration <= videoAmount:
# Url de la requete
query_ = Query(queryChoice) if queryChoice in ["1", "2", "3", "4", "5", "6"] else keyword
maxresults = 1 if queryChoice in ["1", "2", "3", "4", "5", "6"] else videoAmount
query2_ = urllib.parse.quote_plus(query_)
urlData = f"https://www.googleapis.com/youtube/v3/search?key={apiKey}&maxResults={maxresults}&part=snippet&type=video&videoDuration={videoDuration}&order={orderBy}&q={query2_}"
try:
# requete
req = Request(url=urlData, headers={'User-Agent': 'Mozilla/5.0'})
response = urlopen(req).read()
results = json.loads(response.decode('utf-8'))
if 'items' not in results or not results['items']:
print(f"pas de résultat pour: {query_}")
break
for data in results['items']:
videoId = data['id']['videoId']
videoName = data['snippet']['title']
channelId = data['snippet']['channelId']
channelName = data['snippet']['channelTitle']
description = data['snippet']['description']
# durée de la video
details_url = f"https://www.googleapis.com/youtube/v3/videos?part=contentDetails&id={videoId}&key={apiKey}"
details_req = Request(url=details_url, headers={'User-Agent': 'Mozilla/5.0'})
details_response = urlopen(details_req).read()
contentDetails = json.loads(details_response.decode('utf-8'))
duration = contentDetails['items'][0]['contentDetails']['duration']
# mise a jour
update = {
str(iteration): {
'query': query_,
'videoId': videoId,
'videoName': videoName,
'channelId': channelId,
'channelName': channelName,
'description': description,
'duration': duration
}
}
print(f"{iteration}. {query_} - {videoName}")
dumpvideos.update(update)
videolist.append(videoId)
iteration += 1
except urllib.error.HTTPError as e:
print(f"HTTP Error {e.code}: {e.reason}")
except urllib.error.URLError as e:
print(f"URL Error: {e.reason}")
except Exception as e:
print(f"Unexpected error: {e}")
break
# fichier JSON
os.makedirs("./json", exist_ok=True)
with open(f'./json/{jsonname}.json', 'w') as outfile:
dumpdump = {
"Search": {
"searchDate": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"queryChoice": keyword,
"videoAmount": videoAmount,
"videoDuration": videoDuration,
"orderBy": orderBy
},
"Videos": dumpvideos
}
json.dump(dumpdump, outfile, indent=4, ensure_ascii=False)
print("fichier json prêt")
dlChoice = input("télécharger les videos? Y/N: ").lower()
if dlChoice.startswith("y"):
ytDownload(jsonname, videolist)
else:
print("--")
def apply_audio_effects(input_audio_file, output_directory):
try:
# chargement de l'audio
audio = AudioSegment.from_file(input_audio_file)
# effets
processed_audio = apply_extreme_glitch(audio)
# export
base_name = os.path.splitext(os.path.basename(input_audio_file))[0]
output_file = os.path.join(output_directory, f"{base_name}_son.mp3")
processed_audio.export(output_file, format="mp3")
print(f"export à: {output_file}")
return output_file
except Exception as e:
print(f"erreur : {e}")
return None
def ytDownload(jsonname, videolist):
output_dir = os.path.join("output", jsonname)
os.makedirs(output_dir, exist_ok=True)
class MyLogger(object):
def debug(self, msg):
pass
def warning(self, msg):
pass
def error(self, msg):
print(msg)
def my_hook(d):
if d['status'] == 'finished':
print('Download finished, converting...')
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': os.path.join(output_dir, '%(title)s.%(ext)s'),
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
'ignoreerrors': True,
'logger': MyLogger(),
'progress_hooks': [my_hook],
}
print("téléchargement...")
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download(videolist)
print(f"MP3 téléchargés à: {output_dir}.")
print("combiner les MP3? Y/N")
concat_choice = input("Select: ").lower()
if concat_choice.startswith("y"):
combined_file_path = concatenate_mp3_files(jsonname) # Call without additional path
if combined_file_path:
apply_audio_effects(combined_file_path, "output") # Apply audio effects to the concatenated file
else:
print("pas de combinaison.")
def concatenate_mp3_files(jsonname):
output_concatenated_dir = "output concatenated"
os.makedirs(output_concatenated_dir, exist_ok=True)
base_path = os.getcwd()
mp3_directory = os.path.join(base_path, "output", jsonname)
if not os.path.exists(mp3_directory):
print(f"pas de chemin: {mp3_directory}")
return None
mp3_files = [os.path.join(mp3_directory, file) for file in os.listdir(mp3_directory) if file.endswith('.mp3')]
if not mp3_files:
print(f"pas de MP3 trouvés à: {mp3_directory}.")
return None
random.shuffle(mp3_files)
print(f"combiner {len(mp3_files)} fichiers MP3...")
combined_audio = AudioSegment.empty()
# tqdm barre de prog
for mp3_file in tqdm(mp3_files, desc="Adding MP3 files", unit="file"):
print(f"Adding {mp3_file} to the combined file.")
audio_segment = AudioSegment.from_file(mp3_file, format="mp3")
combined_audio += audio_segment
# barre de prog finale
output_file = os.path.join(output_concatenated_dir, f"combined_{jsonname}.mp3")
with tqdm(total=100, desc="Exporting combined file", unit="%") as pbar:
combined_audio.export(output_file, format="mp3")
for i in range(100):
time.sleep(0.05)
pbar.update(1)
print(f"fichier créé à: {output_file}")
return output_file
# Program Starts Here
print('....ĝir2-su Macro Acoustics....')
print('-----------DNIN.URTA DEVICE-----------')
Main()