Spaces:
Running
Running
| import xml.etree.ElementTree as ElementTree | |
| import time | |
| from typing import Callable | |
| import aiohttp | |
| import asyncio | |
| class Option: | |
| __slots__ = "id", "text" | |
| def __init__(self, option_id: int, text: str): | |
| self.id = option_id | |
| self.text = text | |
| class Issue: | |
| __slots__ = "id", "title", "text", "options" | |
| def __init__(self, issue_id: int, title: str, text: str, options: list): | |
| self.id = issue_id | |
| self.title = title | |
| self.text = text | |
| self.options = options | |
| async def handle_ns_response_headers(nation: str, response: aiohttp.ClientResponse, session: aiohttp.ClientSession): | |
| if "X-Pin" in response.headers: | |
| session.headers["X-Pin"] = response.headers["X-Pin"] | |
| if "RateLimit-Remaining" in response.headers and int(response.headers["RateLimit-Remaining"]) < 10: | |
| if "RateLimit-Reset" in response.headers: | |
| sleep_time = int(response.headers["RateLimit-Reset"]) | |
| print(f"{nation}: Pausing for {sleep_time} seconds to avoid rate-limits.") | |
| await asyncio.sleep(sleep_time) | |
| print(f"{nation}: Resumed after sleeping for {sleep_time} seconds.") | |
| async def parse_issue(issue_text): | |
| try: | |
| issue_text = ElementTree.fromstring(issue_text) | |
| except ElementTree.ParseError as e: | |
| print("CRITICAL: FAILED TO PARSE XML RESPONSE FROM NATIONSTATES API.") | |
| print(f"Error: {e}") | |
| print("The full server response that caused the error was:") | |
| print(issue_text) | |
| return [] | |
| issue_list = [] | |
| for issue in issue_text[0]: | |
| issue_id = int(issue.attrib["id"]) | |
| option_list = [] | |
| for stuff in issue: | |
| if stuff.tag == "TITLE": | |
| title = stuff.text | |
| elif stuff.tag == "TEXT": | |
| issue_stuff = stuff.text | |
| elif stuff.tag == "OPTION": | |
| option_list.append( | |
| Option(option_id=int(stuff.attrib["id"]), text=stuff.text) | |
| ) | |
| try: | |
| issue_list.append( | |
| Issue( | |
| issue_id=issue_id, | |
| title=title, | |
| text=issue_stuff, | |
| options=option_list, | |
| ) | |
| ) | |
| except NameError: | |
| pass | |
| return issue_list | |
| async def get_issues(nation, ns_session): | |
| url = f"https://www.nationstates.net/cgi-bin/api.cgi" | |
| params = {"nation": nation, "q": "issues"} | |
| async with ns_session.get(url, params=params) as response: | |
| await handle_ns_response_headers(nation, response, ns_session) | |
| response_text = await response.text() | |
| issue_list = await parse_issue(response_text) | |
| for issue in issue_list: | |
| print(f"{nation}: Found Issue: {issue.id} - {issue.title}") | |
| return issue_list | |
| def format_issue(ns_issue: Issue): | |
| formatted_issue = f"""{ns_issue.title} | |
| The Issue | |
| {ns_issue.text} | |
| The Debate""" | |
| index = 1 | |
| for option in ns_issue.options: | |
| formatted_issue += f"""\n\n{index}. {option.text}""" | |
| index += 1 | |
| return formatted_issue | |
| async def execute_issues( | |
| nation: str, | |
| issues: list, | |
| compare_func: Callable[[str, dict[str, str]], list[tuple[str, float]]], | |
| prompt: str, | |
| ns_session: aiohttp.ClientSession, | |
| ): | |
| print(f"{nation}: Executing {len(issues)} issues...") | |
| execute = [] | |
| for issue in issues: | |
| print("Contacting AI...") | |
| option_dict = {option.id: option.text for option in issue.options} | |
| similarity = compare_func(prompt, option_dict) | |
| selected_option = similarity[0][0] # first option has the highest similarity | |
| print(f"{nation}: Final option ID: {selected_option}. Executing issue {issue.id} for {nation}...") | |
| issue_execution_url = f"https://www.nationstates.net/cgi-bin/api.cgi" | |
| params = { | |
| "nation": nation, | |
| "c": "issue", | |
| "issue": issue.id, | |
| "option": selected_option, | |
| } | |
| async with ns_session.get(issue_execution_url, params=params) as issue_response: | |
| if issue_response.status == 200: | |
| print(f"{nation}: Finished issue {issue.id}!") | |
| else: | |
| print(f"{nation}: Issue execution failed with error code {issue_response.status}") | |
| return execute | |
| await handle_ns_response_headers(nation, issue_response, ns_session) | |
| issue_response = await issue_response.text() | |
| execute.append(issue_response) | |
| return execute | |
| async def time_to_next_issue(nation: str, ns_session: aiohttp.ClientSession): | |
| url = "https://www.nationstates.net/cgi-bin/api.cgi" | |
| params = {"nation": nation, "q": "nextissuetime"} | |
| async with ns_session.get(url, params=params) as response: | |
| response_text = await response.text() | |
| await handle_ns_response_headers(nation, response, ns_session) | |
| try: | |
| time_text = ElementTree.fromstring(response_text) | |
| except ElementTree.ParseError as e: | |
| print("CRITICAL: FAILED TO PARSE XML RESPONSE FROM NATIONSTATES API.") | |
| print(f"Error: {e}") | |
| print("The full server response that caused the error was:") | |
| print(response_text) | |
| next_issue_time = int(time.time() + 60 * 60) | |
| return next_issue_time | |
| timestamp = int(time_text[0].text) | |
| next_issue_time = int((timestamp - time.time()) + 10) | |
| return next_issue_time | |
| async def startup_ratelimit(nation, wait_time): | |
| print( | |
| f"""Nation {nation} prepared. Sleeping for {wait_time} seconds before starting to avoid rate limits...""" | |
| ) | |
| await asyncio.sleep(wait_time) | |
| print( | |
| f"""Nation {nation} has woke up and will start automatically answering issues!""" | |
| ) | |
| async def ns_ai_bot( | |
| nation, password, compare_func, prompt, user_agent, wait_time: int | |
| ): | |
| await startup_ratelimit(nation, wait_time) | |
| async with aiohttp.ClientSession( | |
| headers={ | |
| "X-Password": password, | |
| "User-Agent": user_agent + " Nationstates AI v0.1.2-alpha", | |
| } | |
| ) as ns_session: | |
| while True: | |
| issue_list = await get_issues(nation, ns_session) | |
| if issue_list: | |
| await execute_issues( | |
| nation, | |
| issue_list, | |
| compare_func, | |
| prompt, | |
| ns_session, | |
| ) | |
| else: | |
| print(f"{nation}: No new issues found.") | |
| next_issue_time = await time_to_next_issue(nation, ns_session) | |
| if next_issue_time < 0: | |
| print(f"{nation}: Next issue time is in the past. Checking again in 60 seconds.") | |
| next_issue_time = 60 | |
| print(f"{nation}: Sleeping {next_issue_time:.2f} seconds until next issue...") | |
| await asyncio.sleep(next_issue_time) |