121 lines
3.4 KiB
Python
121 lines
3.4 KiB
Python
# -*- coding:utf-8 -*-
|
|
|
|
import time
|
|
from http.cookies import SimpleCookie
|
|
from threading import Thread, Event
|
|
import requests
|
|
from datetime import datetime
|
|
from utils import COMMON_HEADERS, logger
|
|
from accounts import accounts_info
|
|
|
|
class SunoCookie:
|
|
def __init__(self, account_id=None, session_id=None, cookie_str=None):
|
|
self.account_id = account_id
|
|
self.cookie = SimpleCookie()
|
|
self.session_id = session_id
|
|
self.token = None
|
|
self.last_called = datetime.now()
|
|
self.status = True
|
|
|
|
if cookie_str:
|
|
self.load_cookie(cookie_str)
|
|
|
|
def load_cookie(self, cookie_str):
|
|
self.cookie.load(cookie_str)
|
|
|
|
def get_cookie(self):
|
|
return ";".join([f"{i}={self.cookie.get(i).value}" for i in self.cookie.keys()])
|
|
|
|
def set_session_id(self, session_id):
|
|
self.session_id = session_id
|
|
|
|
def get_session_id(self):
|
|
return self.session_id
|
|
|
|
def get_token(self):
|
|
return self.token
|
|
|
|
def set_token(self, token: str):
|
|
self.token = token
|
|
|
|
def get_status(self):
|
|
return self.status
|
|
|
|
def set_status(self, status: bool):
|
|
self.status = status
|
|
|
|
def update_last_called(self):
|
|
logger.info({"update_last_account_id": self.account_id})
|
|
self.last_called = datetime.now()
|
|
|
|
def __repr__(self):
|
|
return (f"SunoCookie(account_id={self.account_id},"
|
|
f"last_called={self.last_called},"
|
|
f"status={self.status})"
|
|
)
|
|
|
|
class LoadAccounts():
|
|
def __init__(self):
|
|
self.accounts = []
|
|
|
|
def __iter__(self):
|
|
return iter(self.accounts)
|
|
|
|
def set_accounts(self,info: dict) -> None:
|
|
for account_id,details in info.items():
|
|
session_id = details["session_id"]
|
|
cookie_str = details["cookie"]
|
|
self.accounts.append(SunoCookie(account_id,session_id,cookie_str))
|
|
|
|
def get_accounts(self):
|
|
return self.accounts
|
|
|
|
def reset_accounts(self,info: dict) -> None:
|
|
self.accounts.clear()
|
|
self.set_accounts(info)
|
|
|
|
|
|
def update_token(suno_cookie: SunoCookie):
|
|
headers = {"cookie": suno_cookie.get_cookie()}
|
|
headers.update(COMMON_HEADERS)
|
|
session_id = suno_cookie.get_session_id()
|
|
|
|
resp = requests.post(
|
|
url=f"https://clerk.suno.com/v1/client/sessions/{session_id}/tokens?_clerk_js_version=5.26.1",
|
|
headers=headers,
|
|
)
|
|
|
|
resp_headers = dict(resp.headers)
|
|
set_cookie = resp_headers.get("Set-Cookie")
|
|
suno_cookie.load_cookie(set_cookie)
|
|
token = resp.json().get("jwt")
|
|
suno_cookie.set_token(token)
|
|
# print(set_cookie)
|
|
# print(f"*** token -> {token} ***")
|
|
|
|
def keep_alive(suno_cookie: SunoCookie, event: Event):
|
|
while not event.wait(timeout=5):
|
|
try:
|
|
update_token(suno_cookie)
|
|
except Exception as e:
|
|
print(e)
|
|
if event.is_set():
|
|
logger.info({"update_token_info": "preparing to reload thread."})
|
|
break
|
|
|
|
def start_and_manage_threads(suno_cookies, event: Event):
|
|
threads = []
|
|
for suno_cookie in suno_cookies:
|
|
t = Thread(target=keep_alive, args=(suno_cookie, event))
|
|
t.start()
|
|
threads.append(t)
|
|
return threads
|
|
|
|
def reload_threads(suno_cookies, threads: list, event: Event):
|
|
event.set()
|
|
if threads:
|
|
for t in threads:
|
|
if t.is_alive():
|
|
t.join()
|
|
event.clear()
|
|
return start_and_manage_threads(suno_cookies, event)
|