Spaces:
Running
Running
| import requests | |
| import xml.etree.ElementTree as ET | |
| import json | |
| import time | |
| import os | |
| # --- Configuration --- | |
| # Replace with your own nation name or contact info. | |
| USER_AGENT = "NS Issue Search dev update script (Jiangbei)" | |
| CACHE_FILE = "../parsed_ga_resolutions.json" | |
| API_BASE_URL = "https://www.nationstates.net/cgi-bin/api.cgi" | |
| COUNCIL_ID = 1 # 1 for General Assembly, 2 for Security Council | |
| def load_cache(filename): | |
| """Loads existing resolutions from the JSON cache file.""" | |
| if not os.path.exists(filename): | |
| print(f"Cache file '{filename}' not found. Will start from scratch.") | |
| return {} | |
| try: | |
| with open(filename, 'r', encoding='utf-8') as f: | |
| resolutions_list = json.load(f) | |
| # Convert list to a dictionary keyed by resolution ID for fast lookups | |
| return {res['id']: res for res in resolutions_list} | |
| except (json.JSONDecodeError, IOError) as e: | |
| print(f"Error reading cache file '{filename}': {e}. Starting from scratch.") | |
| return {} | |
| def save_cache(filename, resolutions_dict): | |
| """Saves the resolutions dictionary to the JSON cache file.""" | |
| try: | |
| # Convert the dictionary values back to a list and sort by ID | |
| sorted_resolutions = sorted(resolutions_dict.values(), key=lambda r: r['id']) | |
| with open(filename, 'w', encoding='utf-8') as f: | |
| json.dump(sorted_resolutions, f, indent=2) | |
| print(f"Successfully saved {len(sorted_resolutions)} resolutions to '{filename}'.") | |
| except IOError as e: | |
| print(f"Error writing to cache file '{filename}': {e}") | |
| def parse_resolution_xml(xml_string): | |
| """ | |
| Parses a single XML string from the NationStates API into a structured dictionary. | |
| Args: | |
| xml_string: The XML content from the API response. | |
| Returns: | |
| A dictionary representing the resolution data, or None if parsing fails or resolution is empty. | |
| """ | |
| try: | |
| root = ET.fromstring(xml_string) | |
| res_node = root.find('RESOLUTION') | |
| # If the RESOLUTION tag is empty, it means the resolution doesn't exist. | |
| if res_node is None or not list(res_node): | |
| return None | |
| data = {} | |
| # Iterate through all direct child tags of <RESOLUTION> | |
| for child in res_node: | |
| # Special case for COAUTHOR, which has multiple <N> children | |
| if child.tag == 'COAUTHOR': | |
| co_authors = [n.text for n in child.findall('N')] | |
| if co_authors: | |
| data['co_authors'] = co_authors | |
| continue # Skip to the next tag | |
| key = child.tag.lower() | |
| value = child.text | |
| # Try to convert numeric values to integers | |
| try: | |
| data[key] = int(value) | |
| except (ValueError, TypeError): | |
| data[key] = value | |
| # --- Map API fields to desired dictionary structure --- | |
| # Keep required fields with consistent naming | |
| if 'name' in data: data['title'] = data.pop('name') | |
| if 'desc' in data: data['body'] = data.pop('desc') # Keep BBCode as text | |
| if 'councilid' in data: data['id'] = data.pop('councilid') # councilid is resolution id | |
| # Determine status and structure repeal information | |
| if 'repealed_by' in data: | |
| data['status'] = 'Repealed' | |
| data['repealed_by'] = { | |
| 'id': data.pop('repealed_by'), | |
| 'timestamp': data.pop('repealed', None) | |
| } | |
| else: | |
| data['status'] = 'Active' | |
| # Structure info for resolutions that ARE repeals | |
| if 'repeals_resid' in data: | |
| data['repeals'] = { | |
| 'id': data.pop('repeals_resid'), | |
| 'council': data.pop('repeals_councilid') | |
| } | |
| return data | |
| except ET.ParseError as e: | |
| print(f"Error parsing XML: {e}") | |
| return None | |
| def main(): | |
| """Main function to fetch, parse, and cache resolutions.""" | |
| print("--- World Assembly Resolution Fetcher ---") | |
| # Load existing resolutions from cache | |
| cached_resolutions = load_cache(CACHE_FILE) | |
| if cached_resolutions: | |
| # Find the latest resolution ID we already have and start from the next one | |
| start_id = max(cached_resolutions.keys()) + 1 | |
| print(f"Loaded {len(cached_resolutions)} resolutions from cache. Starting fetch from GA#{start_id}.") | |
| else: | |
| start_id = 1 | |
| # --- API Request Loop --- | |
| session = requests.Session() | |
| session.headers.update({'User-Agent': USER_AGENT}) | |
| current_id = start_id | |
| newly_fetched = [] | |
| rate_limit_info = { | |
| 'remaining': 50, | |
| 'reset_in': 30 | |
| } | |
| while True: | |
| # Check if we are about to exceed the rate limit | |
| if rate_limit_info['remaining'] < 2: | |
| wait_time = rate_limit_info['reset_in'] + 1 # Add a small buffer | |
| print(f"Rate limit approaching. Waiting for {wait_time} seconds...") | |
| time.sleep(wait_time) | |
| print(f"Fetching resolution GA#{current_id}...") | |
| params = {'wa': COUNCIL_ID, 'id': current_id, 'q': 'resolution'} | |
| try: | |
| response = session.get(API_BASE_URL, params=params, timeout=15) | |
| # Update rate limit info from headers after every request | |
| rate_limit_info['remaining'] = int(response.headers.get('RateLimit-Remaining', 50)) | |
| rate_limit_info['reset_in'] = int(response.headers.get('RateLimit-Reset', 30)) | |
| # Handle API responses | |
| if response.status_code == 429: | |
| retry_after = int(response.headers.get('Retry-After', 30)) | |
| print(f"Rate limit exceeded (429). Waiting for {retry_after} seconds as requested by API.") | |
| time.sleep(retry_after) | |
| continue # Retry the same ID | |
| response.raise_for_status() # Raises an error for other bad responses (4xx or 5xx) | |
| except requests.exceptions.RequestException as e: | |
| print(f"An error occurred during request for GA#{current_id}: {e}") | |
| print("Stopping script. Run again to resume.") | |
| break | |
| # Parse the response content | |
| parsed_data = parse_resolution_xml(response.text) | |
| if parsed_data: | |
| newly_fetched.append(parsed_data) | |
| current_id += 1 | |
| time.sleep(0.7) # Be polite: 50 requests/30s = 0.6s per request. Add a small delay. | |
| else: | |
| # API returns empty <RESOLUTION> for non-existent IDs, signaling we are done. | |
| print(f"GA#{current_id} does not exist. Assuming it's the last one.") | |
| print("--- Fetching complete. ---") | |
| break | |
| # --- Post-Fetch Processing --- | |
| if not newly_fetched: | |
| print("No new resolutions found. Cache is up-to-date.") | |
| return | |
| print(f"Fetched {len(newly_fetched)} new resolutions.") | |
| # Update cache with new data | |
| updates_made = 0 | |
| for res in newly_fetched: | |
| # Check if this new resolution repeals an older one | |
| if res['status'] == 'Repealed' and res.get('repealed_by'): | |
| repealed_id = res['id'] | |
| # Check if we have the repealed resolution in our cache | |
| if repealed_id in cached_resolutions and cached_resolutions[repealed_id]['status'] == 'Active': | |
| print( | |
| f"Updating status for GA#{repealed_id}: was Active, now Repealed by GA#{res['repealed_by']['id']}.") | |
| cached_resolutions[repealed_id]['status'] = 'Repealed' | |
| cached_resolutions[repealed_id]['repealed_by'] = res['repealed_by'] | |
| updates_made += 1 | |
| # Add the new resolution to our collection | |
| cached_resolutions[res['id']] = res | |
| if updates_made: | |
| print(f"Updated the status of {updates_made} existing resolutions.") | |
| # Save the final, complete collection to the cache file | |
| save_cache(CACHE_FILE, cached_resolutions) | |
| if __name__ == "__main__": | |
| main() |