# This file is part of the Open Parts Database software # Copyright (C) 2022 Valentin Lorentz # # This program is free software: you can redistribute it and/or modify it under the # terms of the GNU Affero General Public License version 3, as published by the # Free Software Foundation. # # This program is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A # PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License along with # this program. If not, see . """ Management of the cache of external pages. This package fetches web pages, archives them in the Internet Archive for future citation, and caches them in the local database for quick access by other workers. """ import datetime import re import socket import time import typing import pkg_resources import requests from opdb.db import Db, models _OPDB_VERSION = pkg_resources.require("opdb")[0].version USER_AGENT = ( f"OPDB/{_OPDB_VERSION} (Open Parts Database cacher; +https://git.tf/opdb/opdb)" ) _wayback_url_re = re.compile( r"^https?://web\.archive\.org/web/(?P[0-9]{14})/(?P.+)$" ) def _datetime_from_ia_timestamp(ia_timestamp: str) -> datetime.datetime: """ >>> _datetime_from_ia_timestamp("20220919233014") datetime.datetime(2022, 9, 19, 23, 30, 14, tzinfo=datetime.timezone.utc) """ dt = datetime.datetime.strptime(ia_timestamp, "%Y%m%d%H%M%S") # Assume it's UTC (neither the Wayback API nor the documentation mention # timezones) return dt.replace(tzinfo=datetime.timezone.utc) class Session: """ Wrapper for :class:`requests.Session`, which tries to use pages cached locally in the postgresql database, and falls back to downloading; making sure they are archived in the Internet Archive. """ def __init__( self, db: Db, min_snapshot_date: datetime.datetime, ias3_auth: str = None ): self.min_snapshot_date = min_snapshot_date self._db = db self._session = requests.Session() self._session.headers["User-Agent"] = USER_AGENT self._ias3_auth = ias3_auth def _fetch_newest_wayback_snapshot( self, url: str ) -> typing.Optional[models.WebPageSnapshot]: """ If the URL is already archived in the Internet Archive (and newer than configured with ``min_snapshot_date``), retrieves the latest snapshot available via the Wayback Machine and returns it. """ # API documentation: https://archive.org/help/wayback_api.php response = self._session.get( "https://archive.org/wayback/available", params={"url": url} ) response.raise_for_status() # TODO: retry newest_ia_snapshot = ( response.json().get("archived_snapshots", {}).get("closest", {}) ) if not newest_ia_snapshot: return None ia_timestamp = newest_ia_snapshot["timestamp"] snapshot_date = _datetime_from_ia_timestamp(ia_timestamp) if snapshot_date < self.min_snapshot_date: return None wayback_url = newest_ia_snapshot["url"] m = _wayback_url_re.match(wayback_url) assert m, f"Unexpected Wayback URL format: {wayback_url}" assert ia_timestamp == m.group( "timestamp" ), "Timestamp unexpectedly missing from snapshot URL: {wayback_url}" return self._fetch_wayback_snapshot(url, wayback_url) def _fetch_wayback_snapshot( self, url: str, wayback_url: str ) -> models.WebPageSnapshot: # Add "id_" after the timestamp in the Wayback URL; it allows fetching the # original page without the navigation header added by the Wayback Machine. # Documented at https://archive.org/post/1044859/ m = _wayback_url_re.match(wayback_url) assert m, f"Unexpected Wayback URL format: {wayback_url}" ia_timestamp = m.group("timestamp") snapshot_url = wayback_url.replace(ia_timestamp, ia_timestamp + "id_", 1) response = self._session.get(snapshot_url) response.raise_for_status() # TODO: retry return models.WebPageSnapshot( url=url, snapshot_date=_datetime_from_ia_timestamp(ia_timestamp), snapshot_url=snapshot_url, retrieved_at=datetime.datetime.now(tz=datetime.timezone.utc), retrieved_by=socket.getfqdn(), response_headers=dict(response.headers), content=response.content, ) def _save_page_now(self, url: str) -> typing.Optional[models.WebPageSnapshot]: if self._ias3_auth is None: return self._anonymous_save_page_now(url) else: return self._authenticated_save_page_now(url, self._ias3_auth) def _anonymous_save_page_now( self, url: str ) -> typing.Optional[models.WebPageSnapshot]: while True: try: response = self._session.get( f"https://web.archive.org/save/{url}", allow_redirects=False, ) response.raise_for_status() except requests.exceptions.HTTPError as e: if e.response.status_code == 429: print(e) print("Sleeping...") time.sleep(10) continue elif e.response.status_code == 520: # "Job failed". We will try again in the next workflow run. return None else: raise else: wayback_url = response.headers["Location"] return self._fetch_wayback_snapshot(url, wayback_url) def _authenticated_save_page_now( self, url: str, ias3_auth: str ) -> typing.Optional[models.WebPageSnapshot]: for _ in range(3): response = self._session.post( "https://web.archive.org/save/", allow_redirects=False, data={"url": url}, headers={ "Accept": "application/json", "Authorization": f"LOW {ias3_auth}", }, ) response.raise_for_status() if response.json().get("status_ext") == "error:too-many-daily-captures": # typically happens when a page repeatedly fails so we # (unsuccessfully) tried to capture it too many times return None job_id = response.json()["job_id"] status = "pending" while status == "pending": time.sleep(5) response = self._session.get( f"https://web.archive.org/save/status/{job_id}" ) response.raise_for_status() status = response.json()["status"] if status == "success": break if response.json()["status"] == "error": print(response.json()["message"]) time.sleep(10) continue # retry assert False, response.json() else: print("Too many failures; giving up.") return None ia_timestamp = response.json()["timestamp"] snapshot_date = _datetime_from_ia_timestamp(ia_timestamp) assert snapshot_date >= self.min_snapshot_date, ( snapshot_date.isoformat(), self.min_snapshot_date.isoformat(), ) wayback_url = f"https://web.archive.org/web/{ia_timestamp}/{url}" return self._fetch_wayback_snapshot(url, wayback_url) def _get_cached_snapshot(self, url: str) -> typing.Optional[models.WebPageSnapshot]: snapshot = self._db.get_last_web_page_snapshot(url) if snapshot is None or snapshot.snapshot_date < self.min_snapshot_date: return None return snapshot def get_or_fetch_snapshot( self, url: str ) -> typing.Optional[models.WebPageSnapshot]: """ Fetches the given URL from the local cache or from the Wayback Machine. Requests archival by the Internet Archive if the Wayback Machine does not already have the page available. """ # First, try the local cache snapshot = self._get_cached_snapshot(url) if snapshot is not None: return snapshot # Then, try fetching from the Wayback Machine (and cache it locally) snapshot = self._fetch_newest_wayback_snapshot(url) if snapshot is not None: self._db.add_web_page_snapshots([snapshot]) return snapshot # If the Internet Archive does not have it yet, trigger its Save Code Now, # and query the Wayback Machine again snapshot = self._save_page_now(url) if snapshot is not None: self._db.add_web_page_snapshots([snapshot]) return snapshot