123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- #!/usr/bin/env python3
- # vim: tabstop=4 shiftwidth=4 expandtab
- import sqlite3
- from collections.abc import Iterable
- from contextlib import contextmanager
- from gemini_antenna.multiFeedParsing import FeedEntry, TwtxtEntry
- # There's a lot of opening and closing going on here,
- # because several processes will be sharing one sqlite3
- # db, which is just a single file. We want to hog it
- # as little as possible to minimize the risk of
- # collisions. Some errors are tolerable; this is a
- # good enough effort.
- class AntennaDB:
- def __init__(self, dbPath="antenna.sqlite"):
- self.dbPath = dbPath
- @contextmanager
- def cursor(self, readonly=False) -> sqlite3.Cursor:
- connection = sqlite3.connect(self.dbPath)
- cursor = connection.cursor()
- yield cursor
- if not readonly:
- connection.commit()
- connection.close()
- def createDB(self) -> None:
- """ Initialize the database. """
- with self.cursor() as cursor:
- cursor.execute("CREATE TABLE IF NOT EXISTS feedqueue (url text)")
- cursor.execute(
- "CREATE TABLE IF NOT EXISTS entries "
- "(feedurl text, author text, updated datetime, title text,"
- " link text primary key)"
- )
- cursor.execute(
- "CREATE TABLE IF NOT EXISTS twtxt "
- "(feedurl text, author text, posted datetime, twt text)"
- )
- def queueFeed(self, *urls: str) -> None:
- """ Add URLs to the feed queue. """
- urlTuples = [(url,) for url in urls]
- with self.cursor() as cursor:
- cursor.executemany("INSERT INTO feedqueue (url) VALUES (?)",
- urlTuples)
- def getQueue(self) -> list[str]:
- """
- :returns: List of URLs in the feed queue.
- """
- with self.cursor(readonly=True) as cursor:
- cursor.execute("SELECT * FROM feedqueue")
- results = []
- for result in cursor.fetchall():
- results.append(result[0])
- return results
- def getEntries(self) -> list[FeedEntry]:
- with self.cursor(readonly=True) as cursor:
- cursor.execute(
- "SELECT feedurl, author, updated, title, link FROM entries "
- "ORDER BY updated DESC"
- )
- results = []
- for result in cursor.fetchall():
- results.append(FeedEntry(feedurl = result[0],
- author = result[1],
- updated = result[2],
- title = result[3],
- link = result[4]))
- return results
- def getTwts(self) -> list[TwtxtEntry]:
- with self.cursor(readonly=True) as cursor:
- cursor.execute(
- "SELECT feedurl, author, posted, twt FROM twtxt "
- "ORDER BY posted DESC"
- )
- results = []
- for result in cursor.fetchall():
- results.append(TwtxtEntry(feedurl = result[0],
- author = result[1],
- posted = result[2],
- twt = result[3]))
- return results
- def deleteFeeds(self, *urls: str):
- """ DELETE entries with matching feed URLs from the database. """
- urlTuples = [(url,) for url in urls]
- with self.cursor() as cursor:
- cursor.executemany("DELETE FROM entries WHERE feedurl LIKE ?",
- urlTuples)
- cursor.executemany("DELETE FROM twtxt WHERE feedurl LIKE ?",
- urlTuples)
- def deleteFromQueue(self, *urls: str):
- """ DELETE feed URLs from feedqueue. """
- urlTuples = [(url,) for url in urls]
- with self.cursor() as cursor:
- cursor.executemany("DELETE FROM feedqueue WHERE url LIKE ?",
- urlTuples)
- def insertFeedEntries(self, entries: Iterable[FeedEntry], limit: int = 0) -> int:
- """
- INSERT entries into the DB, if they're not too old.
- :param limit: Maximum age timestamp.
- :returns: The number of entries inserted.
- """
- entries = [e for e in entries if e.updated > limit]
- entrytuples = []
- for entry in entries:
- entrytuples.append((entry.feedurl, entry.author, entry.updated,
- entry.title, entry.link))
- with self.cursor() as cursor:
- cursor.executemany(
- "INSERT INTO entries (feedurl, author, updated, title, link) "
- "VALUES (?,?,?,?,?) ON CONFLICT (link) DO UPDATE SET "
- "author = excluded.author, updated = excluded.updated, title = excluded.title",
- entrytuples
- )
- return len(entries)
- def insertTwtxtEntries(self, entries: Iterable[TwtxtEntry], limit: int = 0) -> int:
- """
- INSERT entries into the DB, if they're not too old.
- :param limit: Maximum age timestamp.
- :returns: The number of entries inserted.
- """
- entries = [e for e in entries if e.posted > limit]
- entrytuples = []
- for entry in entries:
- entrytuples.append((entry.feedurl, entry.author, entry.posted, entry.twt))
- with self.cursor() as cursor:
- cursor.executemany(
- "INSERT INTO twtxt (feedurl, author, posted, twt) "
- "VALUES (?,?,?,?)", entrytuples
- )
- return len(entries)
- def pruneDB(self, limit: int) -> None:
- """
- DELETE old articles from the database.
- :param limit: Maximum age timestamp.
- """
- with self.cursor() as cursor:
- cursor.execute("DELETE FROM entries WHERE updated < ?", (limit,))
- cursor.execute("DELETE FROM twtxt WHERE posted < ?", (limit,))
|