""" Rss Feed Plugin to get regular updates from Feed """
# Copyright (C) 2020-2021 by UsergeTeam@Github, < https://github.com/UsergeTeam >.
#
# This file is part of < https://github.com/UsergeTeam/Userge > project,
# and is released under the "GNU v3.0 License Agreement".
# Please see < https://github.com/UsergeTeam/Userge/blob/master/LICENSE >
#
# All rights reserved.
import asyncio
import os
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
import feedparser
import wget
from dateutil import parser
from pyrogram.errors import (
ChatWriteForbidden, ChannelPrivate, UserNotParticipant, ChatIdInvalid
)
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton
from userge import userge, Message, Config, logging, get_collection, pool
from userge.utils.exceptions import UsergeBotNotFound
RSS_CHAT_ID = [int(x) for x in os.environ.get("RSS_CHAT_ID", str(Config.LOG_CHANNEL_ID)).split()]
_LOG = logging.getLogger(__name__)
RSS_DICT: Dict[str, List[datetime]] = {}
RSS_COLLECTION = get_collection("RSS_FEED") # Changed Collection Name cuz of Messsssss
TASK_RUNNING = False
async def _init():
async for i in RSS_COLLECTION.find():
RSS_DICT[i['url']] = [i['published'], None]
async def add_new_feed(url: str, l_u: str) -> str:
if url in RSS_DICT:
out_str = "`Url is matched in Existing Feed Database.`"
else:
pub, now = _parse_time(l_u)
out_str = f"""
#ADDED_NEW_FEED_URL
\t\t**FEED URL:** `{url}`
\t\t**LAST UPDATED:** `{pub}`
"""
RSS_DICT[url] = [pub, now]
if not TASK_RUNNING:
asyncio.get_event_loop().create_task(rss_worker())
await RSS_COLLECTION.update_one({'url': url}, {"$set": {'published': pub}}, upsert=True)
return out_str
async def delete_feed(url: str) -> str:
if url in RSS_DICT:
out_str = f"""
#DELETED_FEED_URL
\t\t**FEED_URL:** `{url}`
"""
del RSS_DICT[url]
await RSS_COLLECTION.delete_one({'url': url})
else:
out_str = "`This Url is not in my database.`"
return out_str
async def send_new_post(entries):
title = entries.get('title')
link = entries.get('link')
time = entries.get('published')
thumb = None
author = None
author_link = None
thumb_url = entries.get('media_thumbnail')
if thumb_url:
thumb_url = thumb_url[0].get('url')
thumb = os.path.join(Config.DOWN_PATH, f"{title}.{str(thumb_url).split('.')[-1]}")
if not os.path.exists(thumb):
await pool.run_in_thread(wget.download)(thumb_url, thumb)
if time:
time = _parse_time(time)[0]
if entries.get('authors'):
author = entries.get('authors')[0]['name'].split('/')[-1]
author_link = entries.get('authors')[0]['href']
out_str = f"""
**New post Found**
**Title:** `{title}`
**Author:** [{author}]({author_link})
**Last Updated:** `{time}`
"""
markup = InlineKeyboardMarkup([[InlineKeyboardButton(text="View Post Online", url=link)]])
if thumb:
args = {
'caption': out_str,
'parse_mode': "md",
'reply_markup': markup if userge.has_bot else None
}
else:
args = {
'text': out_str,
'disable_web_page_preview': True,
'parse_mode': "md",
'reply_markup': markup if userge.has_bot else None
}
for chat_id in RSS_CHAT_ID:
args.update({'chat_id': chat_id})
try:
await send_rss_to_telegram(userge.bot, args, thumb)
except (
ChatWriteForbidden, ChannelPrivate, ChatIdInvalid,
UserNotParticipant, UsergeBotNotFound
):
out_str += f"\n\n[View Post Online]({link})"
if 'caption' in args:
args.update({'caption': out_str})
else:
args.update({'text': out_str})
await send_rss_to_telegram(userge, args, thumb)
async def send_rss_to_telegram(client, args: dict, path: str = None):
if path:
if path.lower().endswith((".jpg", ".jpeg", ".png", ".bmp")):
await client.send_photo(photo=path, **args)
elif path.lower().endswith((".mkv", ".mp4", ".webm")):
await client.send_video(video=path, **args)
else:
await client.send_document(document=path, **args)
else:
await client.send_message(**args)
@userge.on_cmd("addfeed", about={
'header': "Add new Feed URL",
'description': "Add new Feed URL to get regular updates from it.",
'usage': "{tr}addfeed url"})
async def add_rss_feed(msg: Message):
""" Add a New feed URL """
if len(RSS_DICT) >= 10:
return await msg.edit("`Sorry, but not allowing to add urls more than 10.`")
if not msg.input_str:
return await msg.err("Feed url not found!")
try:
rss = await _parse(msg.input_str)
except IndexError:
return await msg.edit("The link does not seem to be a RSS feed or is not supported")
out_str = await add_new_feed(msg.input_str, rss.entries[0]['published'])
await msg.edit(out_str, log=__name__)
@userge.on_cmd("delfeed", about={
'header': "Delete an existing Feed URL",
'description': "Delete an existing Feed URL from Database.",
'flags': {'-all': 'Delete All Urls'},
'usage': "{tr}delfeed url"})
async def delete_rss_feed(msg: Message):
""" Delete to a existing Feed Url """
if msg.flags and '-all' in msg.flags:
RSS_DICT.clear()
await RSS_COLLECTION.drop()
return await msg.edit("`Deleted All feeds Successfully...`")
if not msg.input_str:
return await msg.err("Feed url not found!")
out_str = await delete_feed(msg.input_str)
await msg.edit(out_str, log=__name__)
@userge.on_cmd("listrss", about={
'header': "List all feed URLs that you Subscribed.",
'usage': "{tr}listrss"})
async def list_rss_feed(msg: Message):
""" List all Subscribed Feeds """
out_str = ""
for url, date in RSS_DICT.items():
out_str += f"**FEED URL:** `{url}`"
out_str += f"\n**LAST CHECKED:** `{date[1]}`\n\n"
if not out_str:
out_str = "`No feed Url Found.`"
await msg.edit(out_str)
@userge.add_task
async def rss_worker():
global TASK_RUNNING # pylint: disable=global-statement
TASK_RUNNING = True
chunk = 20
if RSS_DICT and RSS_CHAT_ID[0] == Config.LOG_CHANNEL_ID:
_LOG.info(
"You have to add var for `RSS_CHAT_ID`, for Now i will send in LOG_CHANNEL")
while RSS_DICT:
_LOG.debug("Running RSS Worker Background ...")
for url in RSS_DICT:
rss = await _parse(url)
if len(rss.entries) > chunk:
entries = reversed(rss.entries[:chunk])
else:
entries = reversed(rss.entries)
for entry in entries:
pub, now = _parse_time(entry['published'])
if pub <= RSS_DICT[url][0]:
RSS_DICT[url][1] = now
continue
await send_new_post(entry)
if url not in RSS_DICT:
break
RSS_DICT[url] = [pub, now]
await RSS_COLLECTION.update_one({'url': url}, {"$set": {'published': pub}})
await asyncio.sleep(1)
await asyncio.sleep(5)
await asyncio.sleep(60)
TASK_RUNNING = False
def _parse_time(t: str) -> Tuple[datetime, datetime]:
_delta = timedelta(hours=5, minutes=30)
parsed_time = (parser.parse(t) + _delta).replace(tzinfo=None)
datetime_now = datetime.utcnow() + _delta
return parsed_time, datetime_now
@pool.run_in_thread
def _parse(url: str) -> None:
return feedparser.parse(url)