mirror of
https://github.com/cupcakearmy/mercatus.git
synced 2024-11-01 08:14:10 +01:00
major rewrite, use conversation handler for settings, user can set interval for getting notifications
This commit is contained in:
parent
224c7fafa7
commit
59d5210372
112
src/Commands.py
112
src/Commands.py
@ -1,112 +0,0 @@
|
||||
import asyncio
|
||||
from asyncio import sleep
|
||||
from datetime import datetime, timedelta
|
||||
from telegram.ext import CallbackContext
|
||||
from telegram import Update, ParseMode
|
||||
|
||||
from Background import interval
|
||||
from LimitedList import LimitedList
|
||||
from Market import Market
|
||||
from Utils import parse_command, config, Section, persistence, updater
|
||||
|
||||
|
||||
def get_watchlist(context: CallbackContext) -> LimitedList:
|
||||
return LimitedList(context.user_data.setdefault(Section.Watchlist.value, []),
|
||||
config[Section.Watchlist.value]['max_items'])
|
||||
|
||||
|
||||
def watchlist_add(update: Update, context: CallbackContext):
|
||||
value, *rest = parse_command(update)
|
||||
get_watchlist(context).add(value)
|
||||
update.message.reply_text('Saved 💾')
|
||||
|
||||
|
||||
def watchlist_delete(update: Update, context: CallbackContext):
|
||||
value, *rest = parse_command(update)
|
||||
update.message.reply_text('Deleted 🗑' if get_watchlist(context).delete(value) else 'Not found ❓')
|
||||
|
||||
|
||||
def watchlist_all(update: Update, context: CallbackContext):
|
||||
items = get_watchlist(context).all()
|
||||
update.message.reply_text('\n'.join(items) if len(items) > 0 else 'Your list is empty 📭')
|
||||
|
||||
|
||||
def watchlist_clear(update: Update, context: CallbackContext):
|
||||
get_watchlist(context).clear()
|
||||
update.message.reply_text('Cleared 🧼')
|
||||
|
||||
|
||||
def set_api_key(update: Update, context: CallbackContext):
|
||||
value, *rest = parse_command(update)
|
||||
context.user_data[Section.API_Key.value] = value
|
||||
update.message.reply_text('API key saved 🔑')
|
||||
|
||||
|
||||
def get_api_key(update: Update, context: CallbackContext):
|
||||
update.message.reply_text(context.user_data.get(Section.API_Key.value, 'API Key not set ⛔️'))
|
||||
|
||||
|
||||
def data(update: Update, context: CallbackContext):
|
||||
id = update.message.chat_id
|
||||
delta = datetime.now() - timedelta(days=365 * 1)
|
||||
asyncio.run(send_update_to_user(user=id, delta=delta))
|
||||
|
||||
|
||||
def start(update: Update, context: CallbackContext):
|
||||
update.message.reply_markdown("""*Welcome! 👋*
|
||||
|
||||
*1.* First you will need to get a (free) api token for the stock data.
|
||||
[https://www.alphavantage.co/support/#api-key](Alphavantage Key 🔑)
|
||||
|
||||
*2.* Then enter it by sending the code to me with `/setKey myApiCode`
|
||||
|
||||
*3.* Add stocks or ETFs to your `/list` by going to [https://finance.yahoo.com/](Yahoo Finance 📈) and the sending it to `/add`
|
||||
_Example_ For Apple `/add AAPL`
|
||||
|
||||
Enjoy 🚀
|
||||
""")
|
||||
|
||||
|
||||
async def send_update_to_user(user: str, delta: datetime):
|
||||
try:
|
||||
user_data = persistence.get_user_data()[user]
|
||||
running = user_data.setdefault(Section.Running.value, False)
|
||||
if Section.API_Key.value not in user_data:
|
||||
updater.bot.send_message(user, text='API Key not set ⛔️')
|
||||
return
|
||||
|
||||
if running:
|
||||
updater.bot.send_message(user, text='Already running 🏃')
|
||||
return
|
||||
|
||||
user_data[Section.Running.value] = True
|
||||
print('Sending updates to {}'.format(user))
|
||||
market = Market(user_data[Section.API_Key.value])
|
||||
updater.bot.send_message(user, text='Getting updates 🌎')
|
||||
first = True
|
||||
for item in user_data.get(Section.Watchlist.value, []):
|
||||
if first:
|
||||
first = False
|
||||
else:
|
||||
msg = updater.bot.send_message(user, text='Waiting 60 seconds for API... ⏳')
|
||||
await sleep(60)
|
||||
msg.delete()
|
||||
|
||||
msg = updater.bot.send_message(user, text='Calculating {}... ⏳'.format(item))
|
||||
chart = market.get_wma(item, delta)
|
||||
msg.delete()
|
||||
updater.bot.send_message(user, text='*{}*'.format(item), parse_mode=ParseMode.MARKDOWN)
|
||||
updater.bot.send_photo(user, photo=chart)
|
||||
|
||||
except:
|
||||
updater.bot.send_message(user, text='There was an error ⚠️')
|
||||
finally:
|
||||
user_data[Section.Running.value] = False
|
||||
|
||||
|
||||
@interval(every=3 * 60 * 60.0, autorun=False, isolated=True)
|
||||
async def send_updates():
|
||||
delta = datetime.now() - timedelta(days=365 * 1)
|
||||
|
||||
for key in persistence.get_user_data().keys():
|
||||
await send_update_to_user(user=key, delta=delta)
|
121
src/commands/config.py
Normal file
121
src/commands/config.py
Normal file
@ -0,0 +1,121 @@
|
||||
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ReplyKeyboardRemove, ParseMode
|
||||
from telegram.ext import CommandHandler, MessageHandler, Filters, ConversationHandler, CallbackQueryHandler, CallbackContext
|
||||
|
||||
from utils import Section
|
||||
|
||||
MENU, API_KEY, FREQUENCY = range(3)
|
||||
|
||||
|
||||
def show_menu(update: Update, context: CallbackContext):
|
||||
keyboard = [
|
||||
[InlineKeyboardButton('API Key', callback_data=API_KEY)],
|
||||
[InlineKeyboardButton('Frequency', callback_data=FREQUENCY)],
|
||||
[InlineKeyboardButton('Done', callback_data=ConversationHandler.END)],
|
||||
]
|
||||
update.effective_user.send_message(
|
||||
'_Current settings:_\n'
|
||||
f'API Key: *{context.user_data[Section.API_Key.value]}*\n'
|
||||
f'Frequency: *{context.user_data[Section.Frequency.value]}*\n'
|
||||
'\nWhat settings do you want to configure?',
|
||||
parse_mode=ParseMode.MARKDOWN,
|
||||
reply_markup=InlineKeyboardMarkup(keyboard, one_time_keyboard=True)
|
||||
)
|
||||
|
||||
return MENU
|
||||
|
||||
|
||||
def show_menu_api_key(update: Update, context: CallbackContext):
|
||||
update.effective_user.send_message(
|
||||
'Send me your API Key 🙂'
|
||||
'\nor /cancel',
|
||||
reply_markup=ReplyKeyboardRemove()
|
||||
)
|
||||
return API_KEY
|
||||
|
||||
|
||||
def show_menu_frequency(update: Update, context: CallbackContext):
|
||||
keyboard = [
|
||||
[InlineKeyboardButton('2 minutes', callback_data='2m'), InlineKeyboardButton(
|
||||
'30 minutes', callback_data='30m')],
|
||||
[InlineKeyboardButton('hour', callback_data='1h'), InlineKeyboardButton(
|
||||
'4 hours', callback_data='4h')],
|
||||
[InlineKeyboardButton('12 hours', callback_data='12h'), InlineKeyboardButton(
|
||||
'day', callback_data='1d')],
|
||||
[InlineKeyboardButton('3 days', callback_data='3d'), InlineKeyboardButton(
|
||||
'week', callback_data='1w')],
|
||||
[InlineKeyboardButton('Cancel', callback_data='cancel')],
|
||||
]
|
||||
update.effective_user.send_message(
|
||||
'Send me updates every: ⬇',
|
||||
reply_markup=InlineKeyboardMarkup(keyboard)
|
||||
)
|
||||
|
||||
return FREQUENCY
|
||||
|
||||
|
||||
def config(update: Update, context: CallbackContext):
|
||||
context.bot.delete_message(
|
||||
chat_id=update.message.chat_id,
|
||||
message_id=update.message.message_id,
|
||||
)
|
||||
return show_menu(update, context)
|
||||
|
||||
|
||||
def menu(update: Update, context: CallbackContext):
|
||||
selected = int(update.callback_query.data)
|
||||
|
||||
context.bot.delete_message(
|
||||
chat_id=update.callback_query.message.chat_id,
|
||||
message_id=update.callback_query.message.message_id,
|
||||
)
|
||||
|
||||
if selected == API_KEY:
|
||||
return show_menu_api_key(update, context)
|
||||
elif selected == FREQUENCY:
|
||||
return show_menu_frequency(update, context)
|
||||
else:
|
||||
return ConversationHandler.END
|
||||
|
||||
|
||||
def set_api_key(update, context):
|
||||
reply = update.message.text
|
||||
context.user_data[Section.API_Key.value] = reply
|
||||
update.message.reply_text(f'Saved {reply} 💾', reply_markup=ReplyKeyboardRemove())
|
||||
|
||||
return show_menu(update, context)
|
||||
|
||||
|
||||
def set_frequency(update: Update, context: CallbackContext):
|
||||
selected = update.callback_query.data
|
||||
|
||||
if selected != 'cancel':
|
||||
update.callback_query.edit_message_text(f'Saved {selected} 💪')
|
||||
context.user_data[Section.Frequency.value] = selected
|
||||
else:
|
||||
context.bot.delete_message(
|
||||
chat_id=update.callback_query.message.chat_id,
|
||||
message_id=update.callback_query.message.message_id,
|
||||
)
|
||||
|
||||
return show_menu(update, context)
|
||||
|
||||
|
||||
def cancel(update: Update, context: CallbackContext):
|
||||
update.message.reply_text('Canceled', reply_markup=ReplyKeyboardRemove())
|
||||
return ConversationHandler.END
|
||||
|
||||
|
||||
config_handler = ConversationHandler(
|
||||
entry_points=[CommandHandler('config', config)],
|
||||
|
||||
states={
|
||||
MENU: [CallbackQueryHandler(menu)],
|
||||
API_KEY: [
|
||||
CommandHandler('cancel', cancel),
|
||||
MessageHandler(Filters.all, set_api_key),
|
||||
],
|
||||
FREQUENCY: [CallbackQueryHandler(set_frequency)],
|
||||
},
|
||||
|
||||
fallbacks=[CommandHandler('cancel', cancel)]
|
||||
)
|
94
src/commands/other.py
Normal file
94
src/commands/other.py
Normal file
@ -0,0 +1,94 @@
|
||||
from asyncio import sleep, run
|
||||
from datetime import datetime
|
||||
from threading import Timer
|
||||
|
||||
from pytimeparse import parse
|
||||
from telegram import Update, ParseMode
|
||||
from telegram.ext import CallbackContext
|
||||
from telegram.ext.dispatcher import run_async
|
||||
|
||||
from market import Market
|
||||
from text import INTRO_TEXT
|
||||
from utils import Section, persistence, updater, current_timestamp, delta_timestamp
|
||||
|
||||
SENDING = False
|
||||
|
||||
|
||||
def error(update: Update, context: CallbackContext):
|
||||
print(context.error)
|
||||
|
||||
|
||||
def start(update: Update, context: CallbackContext):
|
||||
update.message.reply_markdown(INTRO_TEXT)
|
||||
|
||||
|
||||
@run_async
|
||||
def data(update: Update, context: CallbackContext):
|
||||
delta = current_timestamp() - context.user_data.setdefault(Section.Interval.value, delta_timestamp(days=365))
|
||||
send_update_to_user(user=update.effective_user['id'], delta=delta)
|
||||
|
||||
|
||||
def send_update_to_user(user: str, delta: int):
|
||||
user_data = None
|
||||
try:
|
||||
user_data = persistence.user_data[user]
|
||||
running = user_data.setdefault(Section.Running.value, False)
|
||||
print(f'Running {user} - {user_data}')
|
||||
|
||||
if Section.API_Key.value not in user_data:
|
||||
updater.bot.send_message(user, text='API Key not set ⛔️')
|
||||
return
|
||||
|
||||
if running:
|
||||
updater.bot.send_message(user, text='Already running 🏃')
|
||||
return
|
||||
|
||||
print(f'Sending updates to {user}')
|
||||
user_data[Section.Running.value] = True
|
||||
user_data[Section.LastRun.value] = current_timestamp()
|
||||
|
||||
market = Market(user_data[Section.API_Key.value])
|
||||
updater.bot.send_message(user, text='Getting updates 🌎')
|
||||
|
||||
first = True
|
||||
for item in user_data.get(Section.Watchlist.value, []):
|
||||
if first:
|
||||
first = False
|
||||
else:
|
||||
# Wait to not overload the api
|
||||
msg = updater.bot.send_message(user, text='Waiting 60 seconds for API... ⏳')
|
||||
run(sleep(60))
|
||||
msg.delete()
|
||||
|
||||
msg = updater.bot.send_message(user, text=f'Calculating {item}... ⏳')
|
||||
chart = market.get_wma(item, datetime.fromtimestamp(delta))
|
||||
msg.delete()
|
||||
updater.bot.send_photo(user, photo=chart, caption=f'*{item}*',
|
||||
parse_mode=ParseMode.MARKDOWN, disable_notification=True)
|
||||
|
||||
except Exception as e:
|
||||
print(f'❌ {user} - {e}')
|
||||
updater.bot.send_message(user, text=f'There was an error ⚠️\n {e}')
|
||||
finally:
|
||||
if user_data:
|
||||
user_data[Section.Running.value] = False
|
||||
|
||||
|
||||
def send_updates(context: CallbackContext):
|
||||
global SENDING
|
||||
try:
|
||||
if SENDING:
|
||||
return
|
||||
|
||||
SENDING = True
|
||||
now = current_timestamp()
|
||||
|
||||
for user, user_data in persistence.user_data.items():
|
||||
last_run = user_data.setdefault(Section.LastRun.value, 0)
|
||||
frequency = parse(user_data.setdefault(Section.Frequency.value, '1d'))
|
||||
|
||||
if last_run + frequency < now:
|
||||
delta = now - user_data.setdefault(Section.Interval.value, delta_timestamp(days=365))
|
||||
send_update_to_user(user=user, delta=delta)
|
||||
finally:
|
||||
SENDING = False
|
45
src/commands/watchlist.py
Normal file
45
src/commands/watchlist.py
Normal file
@ -0,0 +1,45 @@
|
||||
from telegram import Update
|
||||
from telegram.ext import CallbackContext
|
||||
|
||||
from limited_list import LimitedList
|
||||
from utils import parse_command, config, Section
|
||||
|
||||
|
||||
def get_watchlist(context: CallbackContext) -> LimitedList:
|
||||
return LimitedList(
|
||||
config[Section.Watchlist.value]['max_items'],
|
||||
context.user_data.setdefault(Section.Watchlist.value, []),
|
||||
)
|
||||
|
||||
|
||||
def save_watchlist(context: CallbackContext, l: LimitedList):
|
||||
context.user_data[Section.Watchlist.value] = l.all()
|
||||
|
||||
|
||||
def watchlist_add(update: Update, context: CallbackContext):
|
||||
value, *rest = parse_command(update)
|
||||
|
||||
wl = get_watchlist(context)
|
||||
wl.add(str(value).upper())
|
||||
save_watchlist(context, wl)
|
||||
update.message.reply_text('Saved 💾')
|
||||
|
||||
|
||||
def watchlist_delete(update: Update, context: CallbackContext):
|
||||
value, *rest = parse_command(update)
|
||||
wl = get_watchlist(context)
|
||||
found = wl.delete(value)
|
||||
save_watchlist(context, wl)
|
||||
update.message.reply_text('Deleted 🗑' if found else 'Not found ❓')
|
||||
|
||||
|
||||
def watchlist_all(update: Update, context: CallbackContext):
|
||||
items = get_watchlist(context).all()
|
||||
update.message.reply_text('\n'.join(items) if len(items) > 0 else 'Your list is empty 📭')
|
||||
|
||||
|
||||
def watchlist_clear(update: Update, context: CallbackContext):
|
||||
wl = get_watchlist(context)
|
||||
wl.clear()
|
||||
save_watchlist(context, wl)
|
||||
update.message.reply_text('Cleared 🧼')
|
@ -1,21 +1,27 @@
|
||||
from typing import List
|
||||
from typing import List, Optional
|
||||
|
||||
|
||||
class LimitedList:
|
||||
"""
|
||||
Basically a List that has a maximum amount of entries.
|
||||
When full and new elements get appended the oldest gets deleted.
|
||||
"""
|
||||
|
||||
def __init__(self, init: List[str], limit: int):
|
||||
self.data = init
|
||||
def __init__(self, limit: int, init: Optional[List[str]]):
|
||||
self.data = init if init else []
|
||||
self.limit = limit
|
||||
|
||||
def _is_index(self, i: int) -> bool:
|
||||
return False if i < 0 or i > len(self.data) - 1 else True
|
||||
|
||||
def add(self, value: str):
|
||||
print(f'Before {self.data}')
|
||||
# Delete oldest element if there are too many
|
||||
if len(self.data) + 1 > self.limit:
|
||||
self.data = self.data[1:]
|
||||
|
||||
self.data.append(value)
|
||||
print(f'After {self.data}')
|
||||
|
||||
def get(self, i: int):
|
||||
return self.data[i] if self._is_index(i) else None
|
@ -1,10 +1,11 @@
|
||||
import io
|
||||
from io import BytesIO
|
||||
from typing import BinaryIO
|
||||
from alpha_vantage.techindicators import TechIndicators
|
||||
from alpha_vantage.timeseries import TimeSeries
|
||||
from datetime import datetime
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import pandas as pd
|
||||
from datetime import datetime
|
||||
from alpha_vantage.techindicators import TechIndicators
|
||||
from alpha_vantage.timeseries import TimeSeries
|
||||
|
||||
|
||||
class Market:
|
||||
@ -13,28 +14,33 @@ class Market:
|
||||
self.ti = TechIndicators(key=key, output_format='pandas')
|
||||
self.ts = TimeSeries(key=key, output_format='pandas')
|
||||
|
||||
def get_wma(self, stock: str, since: datetime) -> BinaryIO:
|
||||
plt.clf()
|
||||
fig = plt.figure()
|
||||
ax = plt.gca()
|
||||
ax.set_ylabel('Dollar')
|
||||
|
||||
def sort_data_and_plot(data: pd.DataFrame, x: str, y: str, label: str):
|
||||
@staticmethod
|
||||
def sort_data_and_plot(data: pd.DataFrame, label: str, since: datetime, x: str, y: str, ax):
|
||||
data = data.reset_index()
|
||||
data['date'] = data['date'].astype('datetime64[ns]')
|
||||
data = data.loc[since < data['date']]
|
||||
data = data.loc[data[y] > 0.0].dropna() # Remove zero values
|
||||
data.plot(x=x, y=y, ax=ax, label=label)
|
||||
|
||||
df: pd.DataFrame = self.ts.get_daily(symbol=stock, outputsize='full')[0]
|
||||
sort_data_and_plot(data=df, x='date', y='1. open', label='Value')
|
||||
def get_wma(self, stock: str, since: datetime) -> BinaryIO:
|
||||
# Init the plot
|
||||
plt.clf()
|
||||
fig = plt.figure()
|
||||
ax = plt.gca()
|
||||
ax.set_ylabel('Dollar')
|
||||
|
||||
# Real price
|
||||
df: pd.DataFrame = self.ts.get_daily(symbol=stock, outputsize='full')[0]
|
||||
Market.sort_data_and_plot(data=df, label='Value', since=since, x='date', y='1. open', ax=ax)
|
||||
|
||||
# Add wma for 45, 90, 120 days
|
||||
for period in [45, 90, 120]:
|
||||
df: pd.DataFrame = self.ti.get_wma(symbol=stock, interval='daily', time_period=period)[0]
|
||||
sort_data_and_plot(data=df, x='date', y='WMA', label=str(period))
|
||||
Market.sort_data_and_plot(data=df, label=str(period), since=since, x='date', y='WMA', ax=ax)
|
||||
|
||||
# Draw and return png as buffer
|
||||
plt.plot()
|
||||
buffer = io.BytesIO()
|
||||
buffer = BytesIO()
|
||||
fig.savefig(buffer, format='png')
|
||||
buffer.seek(0)
|
||||
return buffer
|
@ -1,24 +1,32 @@
|
||||
import matplotlib as mpl
|
||||
from telegram.ext import CommandHandler
|
||||
|
||||
from Utils import updater
|
||||
from Commands import watchlist_add, watchlist_delete, watchlist_all, watchlist_clear, \
|
||||
set_api_key, get_api_key, start, data, send_updates
|
||||
from utils import updater, persistence
|
||||
from commands.config import config_handler
|
||||
from commands.watchlist import watchlist_add, watchlist_delete, watchlist_all, watchlist_clear
|
||||
from commands.other import start, data, send_updates
|
||||
|
||||
|
||||
def main():
|
||||
# Setup
|
||||
mpl.use('agg')
|
||||
dp = updater.dispatcher
|
||||
jq = updater.job_queue
|
||||
|
||||
# Handlers
|
||||
dp.add_handler(CommandHandler('add', watchlist_add))
|
||||
dp.add_handler(CommandHandler('delete', watchlist_delete))
|
||||
dp.add_handler(CommandHandler('list', watchlist_all))
|
||||
dp.add_handler(CommandHandler('clear', watchlist_clear))
|
||||
dp.add_handler(CommandHandler('setkey', set_api_key))
|
||||
dp.add_handler(CommandHandler('getkey', get_api_key))
|
||||
dp.add_handler(config_handler)
|
||||
dp.add_handler(CommandHandler('start', start))
|
||||
dp.add_handler(CommandHandler('data', data))
|
||||
|
||||
# Cron jobs
|
||||
jq.run_repeating(send_updates, interval=30, first=0)
|
||||
|
||||
# Start
|
||||
print('Started 🚀')
|
||||
send_updates()
|
||||
updater.start_polling()
|
||||
updater.idle()
|
||||
|
15
src/text.py
Normal file
15
src/text.py
Normal file
@ -0,0 +1,15 @@
|
||||
INTRO_TEXT = """
|
||||
*Welcome! 👋*
|
||||
|
||||
*1.* First you will need to get a (free) api token for the stock data.
|
||||
[https://www.alphavantage.co/support/#api-key](Alphavantage Key 🔑)
|
||||
|
||||
*2.* Then enter it by sending the code to me with `/setKey myApiCode`
|
||||
|
||||
*3.* Add stocks or ETFs to your `/list` by going to [https://finance.yahoo.com/](Yahoo Finance 📈) and the sending it to `/add`
|
||||
_Example_ For Apple `/add AAPL`
|
||||
|
||||
*4.* Optionally choose how often you will get updates by calling the `/frequency` command.
|
||||
|
||||
Enjoy 🚀
|
||||
""".lstrip()
|
@ -1,24 +1,16 @@
|
||||
from datetime import datetime, timedelta
|
||||
from enum import Enum
|
||||
|
||||
from yaml import load, Loader
|
||||
from telegram import Update
|
||||
from telegram.ext import PicklePersistence, Updater
|
||||
from yaml import load, Loader
|
||||
from enum import Enum
|
||||
import pickle
|
||||
|
||||
DB_FILE = './data.db'
|
||||
CONFIG_FILE = './config.yml'
|
||||
DEFAULT_DATA = {
|
||||
'user_data': {},
|
||||
'chat_data': {},
|
||||
'conversations': {},
|
||||
}
|
||||
|
||||
try:
|
||||
pickle.load(open(DB_FILE, 'rb'))
|
||||
except:
|
||||
pickle.dump(DEFAULT_DATA, open(DB_FILE, 'wb'))
|
||||
|
||||
config = load(open(CONFIG_FILE, 'r'), Loader=Loader)
|
||||
persistence = PicklePersistence(DB_FILE)
|
||||
# persistence.load_singlefile()
|
||||
updater: Updater = Updater(config['token'], use_context=True, persistence=persistence)
|
||||
|
||||
|
||||
@ -26,8 +18,25 @@ class Section(Enum):
|
||||
Watchlist = 'watchlist'
|
||||
API_Key = 'api_key'
|
||||
Running = 'running'
|
||||
Interval = 'interval' # Time axis of the graph
|
||||
Frequency = 'frequency' # How ofter updates should be sent
|
||||
LastRun = 'last_run'
|
||||
|
||||
|
||||
def current_timestamp():
|
||||
return int(datetime.now().timestamp())
|
||||
|
||||
|
||||
def delta_timestamp(**kwargs):
|
||||
return int(timedelta(**kwargs).total_seconds())
|
||||
|
||||
|
||||
def parse_command(update: Update) -> (str, str):
|
||||
key, value = (update.message.text.split(' ', 1)[1].split(' ', 1) + [None])[:2]
|
||||
return key, value
|
||||
|
||||
|
||||
def parse_callback(update: Update) -> str:
|
||||
selected = update.callback_query.data
|
||||
cleaned = ''.join(selected.split(':')[1:]) # Remove the pattern from the start
|
||||
return cleaned
|
Loading…
Reference in New Issue
Block a user