|
- import hashlib
- import re
- import sqlite3
- from freepost import random, settings
- db = sqlite3.connect(settings['sqlite']['database'])
- # Returns SQLite rows as dictionaries instead of tuples.
- # https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.row_factory
- db.row_factory = sqlite3.Row
- # A custom function to compute SHA-512 because it's not built into SQLite
- db.create_function('SHA512', 1, lambda text:
- None if text is None else hashlib.sha512(text.encode('UTF-8')).hexdigest())
- # The REGEXP operator is a special syntax for the regexp() user function. No
- # regexp() user function is defined by default and so use of the REGEXP operator
- # will normally result in an error message. If an application-defined SQL
- # function named "regexp" is added at run-time, then the "X REGEXP Y" operator
- # will be implemented as a call to "regexp(Y,X)".
- db.create_function('REGEXP', 2, lambda pattern, string:
- re.search(pattern, string, flags=re.IGNORECASE) is not None)
- # Store a new session_id for a user that has logged in
- # The session token is stored in the user cookies during login, here
- # we store the hash value of that token.
- def new_session(user_id, session_token):
- with db:
- db.execute(
- """
- UPDATE user
- SET session = SHA512(:session)
- WHERE id = :user
- """,
- {
- 'user': user_id,
- 'session': session_token
- }
- )
- # Delete user session token on logout
- def delete_session (user_id):
- with db:
- db.execute (
- """
- UPDATE user
- SET session = NULL
- WHERE id = :user
- """,
- {
- 'user': user_id
- }
- )
- # Check user login credentials
- #
- # @return None if bad credentials, otherwise return the user
- def check_user_credentials (username, password):
- with db:
- cursor = db.execute (
- """
- SELECT *
- FROM user
- WHERE username = :username
- AND password = SHA512(:password || salt)
- AND isActive = 1
- """,
- {
- 'username': username,
- 'password': password
- }
- )
-
- return cursor.fetchone ()
- # Check if username exists
- def username_exists (username, case_sensitive = True):
- if not username:
- return None
-
- if case_sensitive:
- where = 'WHERE username = :username'
- else:
- where = 'WHERE LOWER(username) = LOWER(:username)'
-
- with db:
- cursor = db.execute(
- """
- SELECT *
- FROM user
- """ +
- where,
- {
- 'username': username
- }
- )
-
- return cursor.fetchone() is not None
- # Check if post with same link exists. This is used to check for duplicates.
- # Returns an empty list if the link wasn't posted before, otherwise returns the posts.
- def link_exists (link):
- if not link:
- return []
- with db:
- cursor = db.execute(
- """
- SELECT *
- FROM post
- WHERE LOWER(link) = LOWER(:link)
- ORDER BY created DESC
- """,
- {
- 'link': link
- }
- )
-
- return cursor.fetchall()
- # Create new user account
- def new_user (username, password):
- # Create a hash_id for the new post
- hash_id = random.alphanumeric_string (10)
-
- # Create a salt for user's password
- salt = random.ascii_string (16)
-
- # Add user to database
- with db:
- db.execute (
- """
- INSERT INTO user (hashId, isActive, password, registered, salt, username)
- VALUES (:hash_id, 1, SHA512(:password || :salt), DATE(), :salt, :username)
- """,
- {
- 'hash_id': hash_id,
- 'password': password,
- 'salt': salt,
- 'username': username
- }
- )
- # Check if session token exists
- def is_valid_session (token):
- return get_user_by_session_token (token) is not None
- # Return the number of unread replies
- def count_unread_messages (user_id):
- with db:
- cursor = db.execute (
- """
- SELECT COUNT(1) AS new_messages
- FROM comment
- WHERE parentUserId = :user AND userId != :user AND `read` = 0
- """,
- {
- 'user': user_id
- }
- )
-
- return cursor.fetchone ()['new_messages']
- # Retrieve a user
- def get_user_by_username (username):
- if not username:
- return None
-
- with db:
- cursor = db.execute(
- """
- SELECT *
- FROM user
- WHERE username = :username
- """,
- {
- 'username': username
- }
- )
-
- return cursor.fetchone()
- # Retrieve a user from a session cookie
- def get_user_by_session_token(session_token):
- with db:
- cursor = db.execute(
- """
- SELECT *
- FROM user
- WHERE session = SHA512(:session)
- """,
- {
- 'session': session_token
- }
- )
-
- return cursor.fetchone()
- # Get posts by date (for homepage)
- def get_posts (page = 0, session_user_id = None, sort = 'hot', topic = None):
- if sort == 'new':
- sort = 'ORDER BY P.created DESC'
- else:
- sort = 'ORDER BY P.dateCreated DESC, P.vote DESC, P.commentsCount DESC'
-
- if topic:
- topic_name = 'WHERE T.name = :topic'
- else:
- topic_name = ''
-
- with db:
- cursor = db.execute (
- """
- SELECT P.*,
- U.username,
- V.vote AS user_vote,
- GROUP_CONCAT(T.name, " ") AS topics
- FROM post AS P
- JOIN user AS U ON P.userId = U.id
- LEFT JOIN vote_post as V ON V.postId = P.id AND V.userId = :user
- LEFT JOIN topic as T ON T.post_id = P.id
- {topic}
- GROUP BY P.id
- {order}
- LIMIT :limit
- OFFSET :offset
- """.format (topic=topic_name, order=sort),
- {
- 'user': session_user_id,
- 'limit': settings['defaults']['items_per_page'],
- 'offset': page * settings['defaults']['items_per_page'],
- 'topic': topic
- }
- )
-
- return cursor.fetchall ()
- # Retrieve user's own posts
- def get_user_posts (user_id):
- with db:
- cursor = db.execute (
- """
- SELECT *
- FROM post
- WHERE userId = :user
- ORDER BY created DESC
- LIMIT 50
- """,
- {
- 'user': user_id
- }
- )
-
- return cursor.fetchall()
- # Retrieve user's own comments
- def get_user_comments (user_id):
- with db:
- cursor = db.execute (
- """
- SELECT C.*,
- P.title AS postTitle,
- P.hashId AS postHashId
- FROM comment AS C
- JOIN post AS P ON P.id = C.postId
- WHERE C.userId = :user
- ORDER BY C.created DESC
- LIMIT 50
- """,
- {
- 'user': user_id
- }
- )
-
- return cursor.fetchall()
- # Retrieve user's own replies to other people
- def get_user_replies (user_id):
- with db:
- cursor = db.execute(
- """
- SELECT C.*,
- P.title AS postTitle,
- P.hashId AS postHashId,
- U.username AS username
- FROM comment AS C
- JOIN post AS P ON P.id = C.postId
- JOIN user AS U ON U.id = C.userId
- WHERE C.parentUserId = :user AND C.userId != :user
- ORDER BY C.created DESC
- LIMIT 50
- """,
- {
- 'user': user_id
- }
- )
-
- return cursor.fetchall()
- # Update user information
- def update_user (user_id, about, email, email_notifications, preferred_feed):
- with db:
- # Update user info, but not email address
- db.execute(
- """
- UPDATE user
- SET about = :about,
- email_notifications = :notifications,
- preferred_feed = :preferred_feed
- WHERE id = :user
- """,
- {
- 'about': about,
- 'notifications': email_notifications,
- 'user': user_id,
- 'preferred_feed': preferred_feed
- }
- )
-
- # Update email address. Convert all addresses to LOWER() case. This
- # prevents two users from using the same address with different case.
- # IGNORE update if the email address is already specified. This is
- # necessary to avoid an "duplicate key" exception when updating value.
- db.execute (
- """
- UPDATE OR IGNORE user
- SET email = LOWER(:email)
- WHERE id = :user
- """,
- {
- 'email': email,
- 'user': user_id
- }
- )
- # Set user replies as read
- def set_replies_as_read (user_id):
- with db:
- db.execute(
- """
- UPDATE comment
- SET `read` = 1
- WHERE parentUserId = :user AND `read` = 0
- """,
- {
- 'user': user_id
- }
- )
- # Submit a new post/link
- def new_post (title, link, text, user_id):
- # Create a hash_id for the new post
- hash_id = random.alphanumeric_string (10)
-
- with db:
- db.execute(
- """
- INSERT INTO post (hashId, created, dateCreated, title,
- link, text, vote, commentsCount, userId)
- VALUES (:hash_id, DATETIME(), DATE(), :title, :link,
- :text, 0, 0, :user)
- """,
- {
- 'hash_id': hash_id,
- 'title': title,
- 'link': link,
- 'text': text,
- 'user': user_id
- }
- )
-
- return hash_id
- # Set topics post. Deletes existing ones.
- def replace_post_topics (post_id, topics = ''):
- if not topics:
- return
-
- # Normalize topics
- # 1. Split topics by space
- # 2. Remove empty strings
- # 3. Lower case topic name
- topics = [ topic.lower () for topic in topics.split (' ') if topic ]
-
- if len (topics) == 0:
- return
-
- # Remove extra topics if the list is too long
- topics = topics[:settings['defaults']['topics_per_post']]
-
- with db:
- # First we delete the existing topics
- db.execute (
- """
- DELETE
- FROM topic
- WHERE post_id = :post
- """,
- {
- 'post': post_id
- }
- )
-
- # Now insert the new topics.
- # IGNORE duplicates that trigger UNIQUE constraint.
- db.executemany (
- """
- INSERT OR IGNORE INTO topic (post_id, name)
- VALUES (?, ?)
- """,
- [ (post_id, topic) for topic in topics ]
- )
- # Retrieve a post
- def get_post (hash, session_user_id = None):
- with db:
- cursor = db.execute (
- """
- SELECT P.*,
- U.username,
- V.vote AS user_vote
- FROM post AS P
- JOIN user AS U ON P.userId = U.id
- LEFT JOIN vote_post as V ON V.postId = P.id AND V.userId = :user
- WHERE P.hashId = :post
- """,
- {
- 'user': session_user_id,
- 'post': hash
- }
- )
-
- return cursor.fetchone ()
- # Update a post
- def update_post (title, link, text, post_hash_id, user_id):
- with db:
- db.execute (
- """
- UPDATE post
- SET title = :title,
- link = :link,
- text = :text
- WHERE hashId = :hash_id
- AND userId = :user
- """,
- {
- 'title': title,
- 'link': link,
- 'text': text,
- 'hash_id': post_hash_id,
- 'user': user_id
- }
- )
- # Retrieve all comments for a specific post
- def get_post_comments (post_id, session_user_id = None):
- with db:
- cursor = db.execute (
- """
- SELECT C.*,
- U.username,
- V.vote AS user_vote
- FROM comment AS C
- JOIN user AS U ON C.userId = U.id
- LEFT JOIN vote_comment as V ON V.commentId = C.id AND V.userId = :user
- WHERE C.postId = :post
- ORDER BY C.vote DESC,
- C.created ASC
- """,
- {
- 'user': session_user_id,
- 'post': post_id
- }
- )
-
- return cursor.fetchall ()
- # Retrieve all topics for a specific post
- def get_post_topics (post_id):
- with db:
- cursor = db.execute (
- """
- SELECT T.name
- FROM topic AS T
- WHERE T.post_id = :post
- ORDER BY T.name ASC
- """,
- {
- 'post': post_id
- }
- )
-
- return cursor.fetchall ()
- # Submit a new comment to a post
- def new_comment (comment_text, post_hash_id, user_id, parent_user_id = None, parent_comment_id = None):
- # Create a hash_id for the new comment
- hash_id = random.alphanumeric_string (10)
-
- # Retrieve post
- post = get_post (post_hash_id)
-
- with db:
- db.execute (
- """
- INSERT INTO comment (hashId, created, dateCreated, `read`, text, vote,
- parentId, parentUserId, postId, userId)
- VALUES (:hash_id, DATETIME(), DATE(), 0, :text, 0, :parent_id,
- :parent_user_id, :post_id, :user)
- """,
- {
- 'hash_id': hash_id,
- 'text': comment_text,
- 'parent_id': parent_comment_id,
- 'parent_user_id': parent_user_id,
- 'post_id': post['id'],
- 'user': user_id
- }
- )
-
- # Increase comments count for post
- db.execute (
- """
- UPDATE post
- SET commentsCount = commentsCount + 1
- WHERE id = :post
- """,
- {
- 'post': post['id']
- }
- )
-
- return hash_id
- # Retrieve a single comment
- def get_comment (hash_id, session_user_id = None):
- with db:
- cursor = db.execute(
- """
- SELECT C.*,
- P.hashId AS postHashId,
- P.title AS postTitle,
- U.username,
- V.vote AS user_vote
- FROM comment AS C
- JOIN user AS U ON C.userId = U.id
- JOIN post AS P ON P.id = C.postId
- LEFT JOIN vote_comment as V ON V.commentId = C.id AND V.userId = :user
- WHERE C.hashId = :comment
- """,
- {
- 'user': session_user_id,
- 'comment': hash_id
- }
- )
-
- return cursor.fetchone()
- # Retrieve last N newest comments
- def get_latest_comments ():
- with db:
- cursor = db.execute (
- """
- SELECT C.*,
- P.hashId AS postHashId,
- P.title AS postTitle,
- U.username
- FROM comment AS C
- JOIN user AS U ON C.userId = U.id
- JOIN post AS P ON P.id = C.postId
- ORDER BY C.id DESC
- LIMIT 50
- """,
- {
- }
- )
-
- return cursor.fetchall ()
- # Update a comment
- def update_comment (text, comment_hash_id, user_id):
- with db:
- db.execute (
- """
- UPDATE comment
- SET text = :text
- WHERE hashId = :comment AND userId = :user
- """,
- {
- 'text': text,
- 'comment': comment_hash_id,
- 'user': user_id
- }
- )
- # Add or update vote to a post
- def vote_post (post_id, user_id, vote):
- with db:
- # Create a new vote for this post, if one doesn't already exist
- db.execute(
- """
- INSERT OR IGNORE INTO vote_post (vote, datetime, postId, userId)
- VALUES (0, DATETIME(), :post, :user)
- """,
- {
- 'post': post_id,
- 'user': user_id
- }
- )
-
- # Update user vote (+1 or -1)
- db.execute(
- """
- UPDATE vote_post
- SET vote = vote + :vote
- WHERE postId = :post AND userId = :user
- """,
- {
- 'vote': vote,
- 'post': post_id,
- 'user': user_id
- }
- )
-
- # Update post's total
- db.execute (
- """
- UPDATE post
- SET vote = vote + :vote
- WHERE id = :post
- """,
- {
- 'vote': vote,
- 'post': post_id
- }
- )
- # Add or update vote to a comment
- def vote_comment (comment_id, user_id, vote):
- with db:
- # Create a new vote for this post, if one doesn't already exist
- db.execute (
- """
- INSERT INTO vote_comment (vote, datetime, commentId, userId)
- VALUES (0, DATETIME(), :comment, :user)
- """,
- {
- 'comment': comment_id,
- 'user': user_id
- }
- )
-
- # Update user vote (+1 or -1)
- db.execute (
- """
- UPDATE vote_comment
- SET vote = vote + :vote
- WHERE commentId = :comment AND userId = :user
- """,
- {
- 'vote': vote,
- 'comment': comment_id,
- 'user': user_id
- }
- )
-
- # Update comment's total
- db.execute (
- """
- UPDATE comment
- SET vote = vote + :vote
- WHERE id = :comment
- """,
- {
- 'vote': vote,
- 'comment': comment_id
- }
- )
- # Search posts
- def search (query, sort='newest', page=0):
- if not query:
- return []
-
- # Remove multiple white spaces and replace with '|' (for query REGEXP)
- query = re.sub (' +', '|', query.strip ())
-
- if len (query) == 0:
- return []
-
- if sort == 'newest':
- sort = 'P.created DESC'
- if sort == 'points':
- sort = 'P.vote DESC'
-
- with db:
- cursor = db.execute (
- """
- SELECT P.*,
- U.username
- FROM post AS P
- JOIN user AS U ON P.userId = U.id
- WHERE P.title REGEXP :query
- ORDER BY {sort}
- LIMIT :limit
- OFFSET :offset
- """.format (sort=sort),
- {
- 'query': query,
- 'sort': sort,
- 'limit': settings['defaults']['search_results_per_page'],
- 'offset': page * settings['defaults']['search_results_per_page']
- }
- )
-
- return cursor.fetchall ()
- # Set reset token for user email
- def set_password_reset_token (user_id = None, token = None):
- if not user_id or not token:
- return
-
- with db:
- db.execute (
- """
- UPDATE user
- SET passwordResetToken = SHA512(:token),
- passwordResetTokenExpire = DATETIME('now', '+1 HOUR')
- WHERE id = :user
- """,
- {
- 'user': user_id,
- 'token': token
- }
- )
- # Delete the password reset token for a user
- def delete_password_reset_token (user_id = None):
- with db:
- db.execute (
- """
- UPDATE user
- SET passwordResetToken = NULL,
- passwordResetTokenExpire = NULL
- WHERE id = :user
- """,
- {
- 'user': user_id
- }
- )
- # Check if a reset token has expired.
- def is_password_reset_token_valid (user_id = None):
- with db:
- cursor = db.execute(
- """
- SELECT COUNT(1) AS valid
- FROM user
- WHERE id = :user
- AND passwordResetToken IS NOT NULL
- AND passwordResetTokenExpire IS NOT NULL
- AND passwordResetTokenExpire > DATETIME('now')
- """,
- {
- 'user': user_id
- }
- )
-
- return cursor.fetchone()['valid'] == 1
- # Reset user password
- def reset_password (username = None, email = None, new_password = None, secret_token = None):
- if not new_password:
- return
-
- with db:
- db.execute (
- """
- UPDATE user
- SET password = SHA512(:password || salt),
- passwordResetToken = NULL,
- passwordResetTokenExpire = NULL
- WHERE username = :user
- AND email = :email
- AND passwordResetToken = SHA512(:token)
- AND passwordResetTokenExpire > DATE()
- """,
- {
- 'password': new_password,
- 'user': username,
- 'email': email,
- 'token': secret_token
- }
- )
|