123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- #!/usr/bin/env python3
- # vim: tabstop=4 shiftwidth=4 expandtab
- import html
- import time
- import urllib.parse
- from argparse import Namespace
- from collections.abc import Iterable
- from datetime import datetime
- from typing import Optional
- import gemcall
- from gemini_antenna.URLHelper import URLHelper
- from gemini_antenna.db import AntennaDB
- from gemini_antenna.signoffs import getsig
- from gemini_antenna.multiFeedParsing import (parsetwtxt, parsegemsub, parsexml,
- FeedEntry, TwtxtEntry)
- urllib.parse.uses_relative.append("gemini")
- urllib.parse.uses_netloc.append("gemini")
- def formatTime(timestamp: Optional[float], dateformat: str) -> str:
- """
- Convert a timestamp to a string according to a format specification.
- :param timestamp: Seconds since the Epoch. If `None`, current time is used.
- :param dateformat: strftime(3) format specification.
- """
- if timestamp is None:
- timestamp = datetime.utcnow()
- utc = datetime.utcfromtimestamp(timestamp)
- return utc.strftime(dateformat)
- def printPage(pagetext: str, output=None) -> None:
- """ Generate a text/gemini page. """
- if output:
- with open(output, "w") as outputfile:
- outputfile.write(pagetext)
- else:
- print(pagetext)
- def updateStatFile(urlNr: int, entryNr: int, output=None) -> None:
- # avoid ZeroDivisionError
- if urlNr == 0:
- return
- timestr = formatTime(None, '%Y-%m-%dT%H:%M:%SZ')
- line = "{0}\t{1}\t{2}\t{3:.2}".format(timestr, urlNr, entryNr, entryNr / urlNr)
- if output:
- with open(output, "a") as outputfile:
- print(line, file=outputfile)
- else:
- print(line)
- def generateFeedPage(entries: Iterable[FeedEntry], output=None) -> None:
- pagetext='''# Antenna
- ## Receiving Transmissions From Geminispace
- => about.gmi What is this?
- => cgi-bin/submit Send transmission!
- '''
- datestamp = "0000-00-00"
- for entry in entries:
- timestamp = formatTime(entry.updated, '%Y-%m-%d')
- if not datestamp == timestamp:
- datestamp = timestamp
- pagetext += "\n"
- pagetext += f"=> {entry.link} {timestamp} {entry.author}: {entry.title}\n"
- pagetext += f'''
- > {getsig()}
- => cgi-bin/log Tail the log
- => twtxt.gmi Antenna's twtxt page.
- => atom.xml Antenna's Atom feed.
- => cgi-bin/filter Customize your Antenna view.
- => stats.tsv Check out the latest statistics in tsv format.
- '''
- printPage(pagetext, output)
- def generateAtomFeed(entries: Iterable[FeedEntry], output=None) -> None:
- pagetext=f'''<?xml version="1.0" encoding="UTF-8"?><feed xmlns="http://www.w3.org/2005/Atom">
- <title>Antenna</title>
- <id>gemini://warmedal.se/~antenna/</id>
- <updated>{formatTime(None, '%Y-%m-%dT%H:%M:%SZ')}</updated>
- <subtitle>Receiving transmissions from geminispace</subtitle>
- <link href="gemini://warmedal.se/~antenna/" rel="alternate"></link>
- <link href="gemini://warmedal.se/~antenna/atom.xml" rel="self"></link>
- '''
- for entry in entries:
- timestamp = formatTime(entry.updated, '%Y-%m-%dT%H:%M:%SZ')
- pagetext += f''' <entry>
- <id>{html.escape(entry.link)}</id>
- <title>{html.escape(entry.title)}</title>
- <updated>{timestamp}</updated>
- <link href="{html.escape(entry.link)}" rel="alternate"></link>
- <author>
- <name>{html.escape(entry.author)}</name>
- </author>
- </entry>
- '''
- pagetext += "</feed>"
- printPage(pagetext, output)
- def generateTwtxtPage(entries: Iterable[TwtxtEntry], output=None):
- pagetext='''# Antenna Twtxt
- => twtxt.txt Plain text version
- '''
- for entry in entries:
- timestamp = formatTime(entry.posted, '%Y-%m-%dT%H:%M:%SZ')
- pagetext += f"{entry.author} {entry.feedurl}\n"
- pagetext += f"> {timestamp} {entry.twt}\n\n"
- pagetext += "\n=> cgi-bin/log Tail the log\n\n"
- pagetext += "> " + getsig() + "\n"
- printPage(pagetext, output)
- def generateTwtxtFile(entries: Iterable[TwtxtEntry], output=None):
- pagetext = ""
- for entry in entries:
- timestamp = formatTime(entry.posted, '%Y-%m-%dT%H:%M:%SZ')
- pagetext += f"{entry.author}\t{entry.feedurl}\t{timestamp}\t{entry.twt}\n"
- printPage(pagetext, output)
- def generateIndex(db: AntennaDB,
- feedPage: Optional[str] = None,
- atomFeedPage: Optional[str] = None,
- statFile: Optional[str] = None) -> None:
- logEntries = db.getEntries()
- feedURLs = {entry.feedurl for entry in logEntries}
- generateFeedPage(logEntries, feedPage)
- generateAtomFeed(logEntries, atomFeedPage)
- updateStatFile(len(feedURLs), len(logEntries), statFile)
- def generateTwtxt(db: AntennaDB,
- twtxtPage: Optional[str] = None,
- twtxtFile: Optional[str] = None) -> None:
- twts = db.getTwts()
- generateTwtxtPage(twts, twtxtPage)
- generateTwtxtFile(twts, twtxtFile)
- def actionRefresh(args: Namespace) -> None:
- def log(msg):
- """ Uniform log messages. """
- timestamp = datetime.utcnow().isoformat()
- with open(args.dataroot / "antenna.log", "a") as logfile:
- print(timestamp, msg, file=logfile)
- # *Phew* That was a lot! Time to get started on all those feeds.
- db = AntennaDB(args.dataroot / "antenna.sqlite")
- feedqueue = set(db.getQueue())
- # Let's not do anything if we don't have to (or really want to)
- if not feedqueue and not args.force:
- return
- log("INFO: validating feeds: " + str(feedqueue))
- uh = URLHelper(args.dataroot / "blocklist.txt")
- removefromqueue = []
- newFeedEntries = 0
- newTwtxtEntries = 0
- agelimit = int(time.mktime(datetime.utcnow().utctimetuple())) - 3600*24*7
- for feedurl in feedqueue:
- # A path may contain '.' and '..' fragments, we'll have to resolve them
- correctedfeedurl = uh.resolve(feedurl)
- if not uh.mightBeAURL(correctedfeedurl):
- log(f"ERROR: pretty sure '{feedurl}' is not a real URL...")
- removefromqueue.append(feedurl)
- continue
- # This is a bit messy, but we want to allow a few redirects and
- # still keep track of which URL we're actually fetching.
- response = None
- entries = []
- for _ in range(3):
- if uh.isBlocked(correctedfeedurl):
- log(f"ERROR: feed URL '{feedurl}' is blocked by rules.")
- removefromqueue.append(feedurl)
- break
- try:
- response = gemcall.request(correctedfeedurl)
- except Exception:
- log(f"ERROR: failed to fetch feed from '{correctedfeedurl}'")
- break
- if response.responsecode in (30, 31):
- log(f"INFO: following redirect from '{correctedfeedurl}' "
- f"to '{response.meta}'.")
- correctedfeedurl = uh.resolve(
- urllib.parse.urljoin(correctedfeedurl, response.meta)
- )
- elif response.responsecode != 20:
- log(f"ERROR: bad response for feed '{correctedfeedurl}': "
- f"{str(response.responsecode)} {response.meta}")
- removefromqueue.append(feedurl)
- response = None
- break
- else:
- break
- if not response:
- continue
- try:
- feed = response.read(200*1024).decode('UTF-8')
- except Exception:
- log(f"ERROR: failed to read socket at '{correctedfeedurl}'")
- continue
- log(f"INFO: fetched feed from '{correctedfeedurl}'")
- db.deleteFeeds(correctedfeedurl)
- newFeedEntriesForFeed = 0
- newTwtxtEntriesForFeed = 0
- log(f"INFO: attempting to parse feed '{correctedfeedurl}' as gemlog feed")
- preliminaryEntries = (parsegemsub(feed, correctedfeedurl)
- or parsexml(feed, correctedfeedurl))
- for entry in preliminaryEntries:
- if not uh.isBlocked(entry.link):
- entries.append(entry)
- else:
- log(f"ERROR: entry URL '{entry.link}' is blocked by rules.")
- if entries:
- newFeedEntriesForFeed = db.insertFeedEntries(entries, agelimit)
- else:
- log(f"INFO: attempting to parse feed '{correctedfeedurl}' as twtxt")
- newTwtxtEntriesForFeed = db.insertTwtxtEntries(
- parsetwtxt(feed, correctedfeedurl), agelimit
- )
- if newFeedEntriesForFeed == 0 and newTwtxtEntriesForFeed == 0:
- log(f"INFO: parsing feed '{correctedfeedurl}' did not result in new entries.")
- else:
- newFeedEntries += newFeedEntriesForFeed
- newTwtxtEntries += newTwtxtEntriesForFeed
- removefromqueue.append(feedurl)
- db.deleteFromQueue(*removefromqueue)
- # And last of all: regenerate the static page
- if not args.silent or args.force:
- # Remove entries older than 7 days from db
- db.pruneDB(agelimit)
- if newFeedEntries > 0 or args.force:
- log("INFO: regenerating feed page.")
- generateIndex(db, args.feedpage, args.atomfeedpage, args.statfile)
- if newTwtxtEntries > 0 or args.force:
- log("INFO: regenerating twtxt pages.")
- generateTwtxt(db, args.twtxtpage, args.twtxtfile)
|