ShareGPT
GodOfThunder
•
1y ago
•
100%
Repost Youtube Feed
# main.py
from typing import Optional
import backoff
import feedparser
import logging
import requests
import sqlite3
from config import (
USERNAME,
PASSWORD,
LEMMY_INSTANCE_URL,
COMMUNITY_NAME,
POSTING_DB_FILE,
MAX_BACKOFF_TIME,
INVIDIOUS_FEED_URL
)
from lemmy import LemmyAPI
from db import Database
from utils import (
initialize_logger,
format_video_title,
get_latest_videos
)
def main() -> None:
initialize_logger()
db = Database(POSTING_DB_FILE)
lemmy = LemmyAPI(USERNAME, PASSWORD, LEMMY_INSTANCE_URL)
community_id = lemmy.get_community_id(COMMUNITY_NAME)
feed = get_latest_videos()
for entry in feed.entries:
process_video(db, lemmy, community_id, entry)
def process_video(
db: Database,
lemmy: LemmyAPI,
community_id: int,
entry: feedparser.FeedParserDict
) -> None:
if db.video_posted(entry.yt_videoid):
logging.info("Video already posted")
return
author = entry.author_detail.name
title = entry.title
post_video(db, lemmy, community_id, entry.yt_videoid, author, title)
@backoff.on_exception(backoff.expo,
(requests.exceptions.RequestException, TypeError),
max_time=MAX_BACKOFF_TIME)
def post_video(
db: Database,
lemmy: LemmyAPI,
community_id: int,
video_id: str,
author: str,
title: str
) -> None:
truncated_title = format_video_title(author, title)
url = f"https://www.youtube.com/watch?v={video_id}"
post_id = lemmy.create_post(community_id, truncated_title, url)
if post_id:
db.insert_posted_video(video_id, post_id, title)
lemmy.lock_post(post_id, True)
else:
logging.warning("Failed to post video")
# lemmy.py
from typing import Optional
from pythorhead import Lemmy
class LemmyAPI:
def __init__(self, username: str, password: str, instance_url: str) -> None:
self.lemmy = Lemmy(instance_url)
self.lemmy.log_in(username, password)
def get_community_id(self, community_name: str) -> Optional[int]:
return self.lemmy.discover_community(community_name)
def create_post(self, community_id: int, title: str, url: str) -> Optional[int]:
return self.lemmy.post.create(community_id, title, url=url)["post_view"]["post"]["id"]
def lock_post(self, post_id: int, lock: bool) -> None:
self.lemmy.post.lock(post_id, lock)
# db.py
import sqlite3
from typing import Tuple, Any, Optional
class Database:
def __init__(self, db_file: str) -> None:
self.conn = sqlite3.connect(db_file)
self.create_table()
def create_table(self) -> None:
self.conn.execute("""
CREATE TABLE IF NOT EXISTS posted_videos (
video_id TEXT PRIMARY KEY,
post_id INTEGER,
title TEXT,
timestamp TEXT
)
""")
def execute_query(self, query: str, params: Tuple[Any, ...] = ()) -> sqlite3.Cursor:
cursor = self.conn.cursor()
cursor.execute(query, params)
self.conn.commit()
return cursor
def video_posted(self, video_id: str) -> bool:
query = """
SELECT EXISTS (
SELECT 1
FROM posted_videos
WHERE video_id = ?
)
"""
cursor = self.execute_query(query, (video_id,))
return bool(cursor.fetchone()[0] == 1)
def insert_posted_video(self, video_id: str, post_id: int, title: str) -> None:
timestamp = datetime.datetime.now().isoformat()
query = """
INSERT INTO posted_videos (video_id, post_id, title, timestamp)
VALUES (?, ?, ?, ?)
"""
self.execute_query(query, (video_id, post_id, title, timestamp))
# utils.py
import feedparser
import logging
def initialize_logger() -> None:
logging.basicConfig(level=logging.INFO,
filename="log.txt",
filemode="w",
format='%(name)s - %(levelname)s - %(message)s')
def format_video_title(author: str, title: str) -> str:
if len(f"{author} — {title}") > 200:
return title[:200]
return f"{author} — {title}"[:200]
def get_latest_videos() -> feedparser.FeedParserDict:
return feedparser.parse(INVIDIOUS_FEED_URL)
Comments 0