From a7aceccf4e6d247ef3f4d02cfdbd23e46fa3092e Mon Sep 17 00:00:00 2001 From: gpkvt Date: Sat, 13 Aug 2022 19:35:02 +0200 Subject: [PATCH] Restructured code --- tts.py | 543 +++++++++++++++++++++++++++++++++------------------------ 1 file changed, 316 insertions(+), 227 deletions(-) diff --git a/tts.py b/tts.py index 441be3a..10d5661 100644 --- a/tts.py +++ b/tts.py @@ -95,6 +95,274 @@ class IRC: """ self.irc.send(bytes("PRIVMSG "+channel+" :"+user+" "+msg+"\r\n", "UTF-8")) + def resp_ping(self): + """ Respond to PING """ + logging.debug('PING received') + self.irc.send(bytes('PONG :tmi.twitch.tv\r\n', "UTF-8")) + + def resp_notice(self, resp): + """ Respond to NOTICE """ + if 'Login authentication failed' in resp: + try: + raise RuntimeError() + except RuntimeError: + logging.exception('Login failed, please check your credentials and try again.') + sys.exit(251) + + def resp_clearmsg(self, resp): + """ Respond to CLEARMSG """ + logging.info('CLEARMSG received') + msgid = False + + global msg_queue_raw # pylint: disable=global-statement,invalid-name + filtered_msg_queue = [] + + tags = resp.split(';') + for tag in tags: + if "target-msg-id=" in tag: + msgid = tag.rsplit('target-msg-id=',1)[1] + logging.debug('Trying to suppress message') + logging.debug(msgid) + + for msg in list(msg_queue_raw): + if msg['msgid'] == msgid: + logging.info('Suppressing message %s', msgid) + else: + filtered_msg_queue.append(msg) + + msg_queue_raw = filtered_msg_queue + + def resp_privmsg(self, resp): + """ Respond to PRIVMSG """ + logging.debug('PRIVMSG received') + + tags = self.get_tags(resp) + message = self.get_message(resp) + + user = tags['user'] + msg = message['message'] + msglen = message['length'] + + logging.debug('Msg: %s', msg) + logging.debug('Msg length: %s', msglen) + logging.debug('Deny List:') + logging.debug(self.tts_denied) + + self.priviledged_commands(message, tags) + + if msg.startswith('#') and self.quickvote_status is True: + logging.info('Quickvote: Cast detected') + self.pollcount += 1 + self.poll[user] = msg.lower() + logging.debug(self.poll) + + if msg.startswith('!tts'): + self.tts_command(message, tags) + + def get_tags(self, resp): + """ Strip tags from response """ + + tags = resp.split(';') + for tag in tags: + if tag.startswith('badges='): + badges = tag.rsplit('badges=',1)[1] + if tag.startswith('subscriber='): + subscriber = tag.rsplit('subscriber=',1)[1] + logging.debug('Subscriber: %s', subscriber) + if tag.startswith('id='): + msgid = tag.rsplit('id=',1)[1] + logging.debug('Message ID: %s', msgid) + if tag.startswith('display-name='): + user = tag.rsplit('display-name=',1)[1].lower() + logging.debug('Username: %s', user) + + tags = {} + tags['badges'] = badges + tags['subscriber'] = subscriber + tags['msgid'] = msgid + tags['user'] = user + + return tags + + def get_message(self, resp): + """ Process message """ + + msg = {} + msg['message'] = resp.rsplit('PRIVMSG #',1)[1].split(':',1)[1].replace('\r\n','') + msg['length'] = len(msg['message']) + + return msg + + def priviledged_commands(self, message, tags): + """ Process priviledged commands """ + + msg = message['message'] + badges = tags['badges'] + user = tags['user'] + + if 'broadcaster' in badges or 'moderator' in badges: + if msg.startswith('!ping'): + logging.debug("Ping check received.") + self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), "Pong!") + + elif msg.startswith('!dtts'): + logging.debug("!dtts command detected") + self.Commands.dtts(self, msg) + + elif msg.startswith('!random'): + logging.info('!random command detected') + self.Commands.random(self, msg) + + elif msg.startswith('!quickvote'): + logging.info("!quickvote command detected") + self.Commands.quickvote(self, msg) + + elif msg.startswith('!ptts'): + logging.debug("!ptts command detected") + self.Commands.ptts(self, msg) + + elif msg.startswith('!toff'): + logging.info('TTS is now turned off') + msg_queue.clear() + msg_queue_raw.clear() + self.tts_status = False + self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['TOFF']) + + elif msg.startswith('!ton'): + logging.info('TTS is now turned on') + msg_queue.clear() + msg_queue_raw.clear() + self.tts_status = True + self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['TON']) + + def check_subonly(self, tags): + """ subonly """ + + subscriber = tags['subscriber'] + badges = tags['badges'] + user = tags['user'] + + if subscriber != "0" or 'moderator' in badges or 'broadcaster' in badges: + logging.debug('TTS is sub-only and user has allowance') + return False + + logging.info('TTS is sub-only') + self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['SUBONLY']) + return True + + def check_modonly(self, tags): + """ modonly """ + + badges = tags['badges'] + user = tags['user'] + + if 'moderator' in badges or 'broadcaster' in badges: + logging.debug('TTS is mod-only and user has allowance') + return False + + logging.info('TTS is sub-only') + self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['MODONLY']) + return True + + def check_tts_enabled(self, user): + """ Check if TTS is enabled """ + if not self.tts_status: + logging.info('TTS is disabled') + self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['DISABLED']) + return False + + logging.debug('TTS is enabled') + return True + + def check_msg_too_long(self, message, user): + """ Check if message is too long """ + + if message['length'] > conf['IRC_TTS_LEN']: + logging.info('TTS message is to long') + self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['TOO_LONG']) + return True + + logging.info('Check length: Message is ok') + return False + + def check_user_denied(self, user): + """ Check if user is on denied list """ + if user in self.tts_denied: + logging.info("%s is not allowed to use TTS", user) + self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['DENIED']) + return True + + logging.debug("%s is allowed to use TTS", user) + return False + + def check_whitelist(self, user): + """ Check Whitelist """ + + if conf['WHITELIST']: + if user not in self.tts_allowed: + logging.info('User is not on whitelist') + logging.info(self.tts_allowed) + self.sendmsg( + conf['IRC_CHANNEL'], + "@"+str(user), conf['MESSAGE']['WHITELISTONLY'] + ) + return False + return True + return False + + def send_tts_msg(self, message, tags): + """ Send message to TTS queue """ + logging.info('Valid TTS message, adding to raw queue') + + tts = True + now = datetime.datetime.now() + timestamp = str(time.time_ns()) + + user = tags['user'] + msgid = tags['msgid'] + badges = tags['badges'] + subscriber = tags['subscriber'] + + msg = message['message'] + msglen = message['length'] + msg = msg.replace('!tts','',1) + + msg = { + "TTS": tts, + "msg": msg, + "badges": badges, + "subscriber": subscriber, + "msgid": msgid, + "user": user, + "length": msglen, + "queuetime": now, + "timestamp": timestamp + } + + msg_queue_raw.append(msg) + + def tts_command(self, message, tags): + """ Process !tts command """ + logging.debug('!tts command detected') + logging.debug("tts status: %s", self.tts_status) + + user = tags['user'] + + if self.check_tts_enabled(user): + return False + elif self.check_msg_too_long(message, user): + return False + elif self.check_user_denied(user): + return False + elif self.check_subonly(tags): + return False + elif self.check_modonly(tags): + return False + elif self.check_whitelist(user): + return False + + self.send_tts_msg(message, tags) + def get_response(self): """Get and process response from IRC""" try: @@ -102,202 +370,22 @@ class IRC: logging.debug('resp:') logging.debug(resp) except socket.timeout: - return False + return except Exception: # pylint: disable=broad-except logging.exception('An unknown error occured while getting a IRC response.') sys.exit(255) if resp.find('PING') != -1: - logging.debug('PING received') - self.irc.send(bytes('PONG :tmi.twitch.tv\r\n', "UTF-8")) + self.resp_ping() if resp.find('CLEARMSG') != -1: - logging.info('CLEARMSG received') - msgid = False - - global msg_queue_raw # pylint: disable=global-statement,invalid-name - filtered_msg_queue = [] - - tags = resp.split(';') - for tag in tags: - if "target-msg-id=" in tag: - msgid = tag.rsplit('target-msg-id=',1)[1] - logging.debug('Trying to suppress message') - logging.debug(msgid) - - for msg in list(msg_queue_raw): - if msg['msgid'] == msgid: - logging.info('Suppressing message %s', msgid) - else: - filtered_msg_queue.append(msg) - - msg_queue_raw = filtered_msg_queue - - return True + self.resp_clearmsg(resp) if resp.find('NOTICE') != -1: - if 'Login authentication failed' in resp: - try: - raise RuntimeError() - except RuntimeError: - logging.exception('Login failed, please check your credentials and try again.') - sys.exit(251) + self.resp_notice(resp) if resp.find('PRIVMSG') != -1: - logging.debug('PRIVMSG received') - badges = False - subscriber = False - msgid = False - msg = False - msglen = False - user = False - tts = False - - tags = resp.split(';') - for tag in tags: - if tag.startswith('badges='): - badges = tag.rsplit('badges=',1)[1] - if tag.startswith('subscriber='): - subscriber = tag.rsplit('subscriber=',1)[1] - logging.debug('Subscriber: %s', subscriber) - if tag.startswith('id='): - msgid = tag.rsplit('id=',1)[1] - logging.debug('Message ID: %s', msgid) - if tag.startswith('display-name='): - user = tag.rsplit('display-name=',1)[1].lower() - logging.debug('Username: %s', user) - - msg = resp.rsplit('PRIVMSG #',1)[1] - msg = msg.split(':',1)[1] - msg = msg.replace('\r\n','') - msglen = len(msg) - - logging.debug('Msg: %s', msg) - logging.debug('Msg length: %s', msglen) - logging.debug('Deny List:') - logging.debug(self.tts_denied) - - if msg.startswith('#') and self.quickvote_status is True: - logging.info('Quickvote: Cast detected') - self.pollcount += 1 - self.poll[user] = msg.lower() - logging.debug(self.poll) - return True - - if 'broadcaster' in badges or 'moderator' in badges: - if msg.startswith('!ping'): - logging.debug("Ping check received.") - self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), "Pong!") - return True - - if msg.startswith('!dtts'): - logging.debug("!dtts command detected") - self.Commands.dtts(self, msg) - return True - - if msg.startswith('!random'): - logging.info('!random command detected') - self.Commands.random(self, msg) - return True - - if msg.startswith('!quickvote'): - logging.info("!quickvote command detected") - self.Commands.quickvote(self, msg) - return True - - if msg.startswith('!ptts'): - logging.debug("!ptts command detected") - self.Commands.ptts(self, msg) - return True - - if msg.startswith('!toff'): - logging.info('TTS is now turned off') - msg_queue.clear() - msg_queue_raw.clear() - self.tts_status = False - self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['TOFF']) - return True - - if msg.startswith('!ton'): - logging.info('TTS is now turned on') - msg_queue.clear() - msg_queue_raw.clear() - self.tts_status = True - self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['TON']) - return True - - if msg.startswith('!tts'): - logging.debug('!tts command detected') - logging.debug("tts status: %s", self.tts_status) - - if not self.tts_status: - logging.info('TTS is disabled') - self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['DISABLED']) - return False - - if msglen > conf['IRC_TTS_LEN']: - logging.info('TTS message is to long') - self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['TOO_LONG']) - return False - - if conf['IRC_SUBONLY']: - if subscriber != "0" or 'moderator' in badges or 'broadcaster' in badges: - logging.debug('TTS is sub-only and user has allowance') - else: - logging.info('TTS is sub-only') - self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['SUBONLY']) - return False - - if conf['IRC_MODONLY']: - if 'moderator' in badges or 'broadcaster' in badges: - logging.debug('TTS is mod-only and user has allowance') - else: - logging.info('TTS is sub-only') - self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['MODONLY']) - return False - - if user in self.tts_denied: - logging.info("%s is not allowed to use TTS", user) - self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['DENIED']) - return False - - if conf['WHITELIST']: - if user not in self.tts_allowed: - logging.info('User is not on whitelist') - logging.info(self.tts_allowed) - self.sendmsg( - conf['IRC_CHANNEL'], - "@"+str(user), conf['MESSAGE']['WHITELISTONLY'] - ) - return False - - logging.warning('Nobody is on the whitelist.') - self.sendmsg( - conf['IRC_CHANNEL'], - "@"+str(user), conf['MESSAGE']['WHITELISTONLY'] - ) - return False - - logging.info('Valid TTS message, adding to raw queue') - tts = True - now = datetime.datetime.now() - msg = msg.replace('!tts','',1) - msg = { - "TTS": tts, - "msg": msg, - "badges": badges, - "subscriber": subscriber, - "msgid": msgid, - "user": user, - "length": msglen, - "queuetime": now, - "timestamp": str(time.time_ns()) - } - msg_queue_raw.append(msg) - - return True - - return False + self.resp_privmsg(resp) class Commands(): """ Bot commands """ @@ -345,7 +433,7 @@ class IRC: self.quickvote_status = False self.poll = {} - return False + return logging.info("Counting votes") count = 0 @@ -707,6 +795,22 @@ if sys.argv[1:]: print('Version 1.2.1') sys.exit(1) +def send_tts_queue(): + """ Send messages to TTS """ + + for raw_msg in msg_queue_raw: + logging.debug('Raw msg: %s', msg_queue_raw) + + now = datetime.datetime.now() + if now - raw_msg['queuetime'] > datetime.timedelta(seconds=conf['IRC_CLEARMSG_TIMEOUT']): + logging.debug('clearmsg_timeout reached') + if raw_msg['timestamp'] not in msg_queue: + logging.info('Sending TTS message') + msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] + logging.debug("msg_queue: %s", msg_queue) + else: + logging.debug('Msg is already in queue') + def main(): """Main loop""" @@ -734,58 +838,43 @@ def main(): logging.info('Please complete the OAuth process within the next 15 minutes.') time.sleep(900) sys.exit(250) - else: - logging.info("Starting IRC bot") - irc = IRC() - irc.connect(conf['IRC_SERVER'], 6667, conf['IRC_CHANNEL'], conf['IRC_USERNAME'], conf['IRC_OAUTH_TOKEN']) - irc.sendmsg(conf['IRC_CHANNEL'], 'MrDestructoid', conf['MESSAGE']['READY']) + logging.info("Starting IRC bot") + irc = IRC() - logging.info("Please open your browser and visit: http://%s:%s/", conf['HTTP_BIND'], conf['HTTP_PORT']) - url = 'http://'+str(conf['HTTP_BIND'])+':'+str(conf['HTTP_PORT']) - webbrowser.open_new_tab(url) + irc.connect(conf['IRC_SERVER'], 6667, conf['IRC_CHANNEL'], conf['IRC_USERNAME'], conf['IRC_OAUTH_TOKEN']) + irc.sendmsg(conf['IRC_CHANNEL'], 'MrDestructoid', conf['MESSAGE']['READY']) - while True: - if conf['LOG_LEVEL'] == "DEBUG": - time.sleep(1) + logging.info("Please open your browser and visit: http://%s:%s/", conf['HTTP_BIND'], conf['HTTP_PORT']) + url = 'http://'+str(conf['HTTP_BIND'])+':'+str(conf['HTTP_PORT']) + webbrowser.open_new_tab(url) - try: - irc.get_response() + while True: + if conf['LOG_LEVEL'] == "DEBUG": + time.sleep(1) - confreload = datetime.datetime.now() - if confreload - lastreload > datetime.timedelta(seconds=60): - conf = load_config() - lastreload = datetime.datetime.now() + try: + irc.get_response() - if irc.quickvote_status and irc.votemsg: - logging.info('Quickvote is active') - irc.sendmsg(conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTESTART'] + " (" + str(irc.votemsg) + ")") + confreload = datetime.datetime.now() + if confreload - lastreload > datetime.timedelta(seconds=60): + conf = load_config() + lastreload = datetime.datetime.now() - if not irc.tts_status: - logging.debug("TTS is disabled") - if conf['LOG_LEVEL'] == "DEBUG": - time.sleep(1) - continue + if irc.quickvote_status and irc.votemsg: + logging.info('Quickvote is active') + irc.sendmsg(conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTESTART'] + " (" + str(irc.votemsg) + ")") + if not irc.tts_status: + continue + else: logging.debug('Raw message queue:') logging.debug(msg_queue_raw) + send_tts_queue() - for raw_msg in msg_queue_raw: - logging.debug('Raw msg:') - logging.debug(msg_queue_raw) - - now = datetime.datetime.now() - if now - raw_msg['queuetime'] > datetime.timedelta(seconds=conf['IRC_CLEARMSG_TIMEOUT']): - logging.debug('clearmsg_timeout reached') - if raw_msg['timestamp'] not in msg_queue: - logging.info('Sending TTS message') - msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] - logging.debug(msg_queue) - else: - logging.debug('Msg is already in queue') - except KeyboardInterrupt: - logging.info('Exiting...') - sys.exit() + except KeyboardInterrupt: + logging.info('Exiting...') + sys.exit() if __name__ == "__main__": main()