#!/usr/bin/python3 # -*- coding: utf-8 -*- # pylint: disable=global-statement,broad-except,too-many-lines """ TwitchTTS Copyright (C) 2022 gpkvt This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ import os import sys import time import json import signal import socket import random import logging import datetime import webbrowser import urllib.request from threading import Thread from collections import Counter from urllib.parse import parse_qs from urllib.error import HTTPError from http.server import HTTPServer, BaseHTTPRequestHandler from socketserver import ThreadingMixIn import yaml import requests import wikipedia from fuzzywuzzy import process class IRC: """ IRC bot """ irc = socket.socket() def __init__(self): self.irc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.tts = { "status": CONF['TTS_STARTENABLED'], "whitelist": [], "blacklist": [] } self.quickvote = { "status": False, "message": False, "count": 0, "data": {} } self.pick = { "status": False, "number": 1, "count": 0, "pickme": [] } if 'WHITELIST_USER' in CONF: self.tts_allowed = CONF['WHITELIST_USER'] def connect(self, server, port, channel, botnick, botpass): """ Connect to Twitch IRC servers :param str server: Servername or IP :param int port: Server Port :param str channel: Channel to connect to :param str botnick: Username :param str botpass: OAuth Token """ logging.info("Connecting to: %s", server) logging.info('Waiting...') try: self.irc.connect((server, port)) except ConnectionResetError: logging.fatal('Twitch refused to connect, please check your settings and try again.') sys.exit(252) self.irc.settimeout(1) self.irc.send(bytes("PASS " + botpass + "\r\n", "UTF-8")) self.irc.send(bytes( "USER " + botnick + " " + botnick +" " + botnick + " :python\r\n", "UTF-8") ) self.irc.send(bytes("NICK " + botnick + "\r\n", "UTF-8")) self.irc.send(bytes("CAP REQ :twitch.tv/commands twitch.tv/tags \r\n", "UTF-8")) time.sleep(5) try: self.irc.send(bytes("JOIN " + channel + "\r\n", "UTF-8")) except ConnectionResetError: logging.warning('JOIN was refused, will try again in 30 seconds.') time.sleep(30) logging.warning('Please check your credentials, if this error persists.') self.irc.send(bytes("JOIN " + channel + "\r\n", "UTF-8")) def sendmsg(self, channel, user, msg): """ Send (a public) message to IRC channel :param str channel: Channel to post msg to :param str user: User to address message to :param str msg: Message to send """ self.irc.send(bytes("PRIVMSG "+channel+" :"+user+" "+msg+"\r\n", "UTF-8")) def resp_ping(self): """ Respond to PING Respond to PING messages, to keep the connection alive. """ logging.debug('PING received') self.irc.send(bytes('PONG :tmi.twitch.tv\r\n', "UTF-8")) def resp_notice(self, resp): """ Respond to NOTICE :param str resp: IRC Server message (in response to command) """ if 'Login authentication failed' in resp: try: raise RuntimeError() except RuntimeError: logging.exception('Login failed, please check your credentials and try again.') sys.exit(251) def resp_clearmsg(self, resp): """ Respond to CLEARMSG :param str resp: IRC Server message """ logging.info('CLEARMSG received') msgid = False global MSG_QUEUE_RAW filtered_msg_queue = [] tags = resp.split(';') for tag in tags: if "target-msg-id=" in tag: msgid = tag.rsplit('target-msg-id=',1)[1] logging.debug('Trying to suppress message') logging.debug(msgid) for msg in list(MSG_QUEUE_RAW): if msg['msgid'] == msgid: logging.info('Suppressing message %s', msgid) else: filtered_msg_queue.append(msg) MSG_QUEUE_RAW = filtered_msg_queue def resp_privmsg(self, resp): """ Respond to PRIVMSG :param str resp: IRC Server message """ logging.debug('PRIVMSG received') tags = self.get_tags(resp) message = self.get_message(resp) self.priviledged_commands(message, tags) self.unpriviledged_commands(message, tags) def get_tags(self, resp): """ Strip tags from response :param str resp: IRC Server message :return dict tags: Message metadata (tags) """ tags = resp.split(';') for tag in tags: if tag.startswith('badges='): badges = tag.rsplit('badges=',1)[1] if tag.startswith('subscriber='): subscriber = tag.rsplit('subscriber=',1)[1] logging.debug('Subscriber: %s', subscriber) if tag.startswith('id='): msgid = tag.rsplit('id=',1)[1] logging.debug('Message ID: %s', msgid) if tag.startswith('display-name='): user = tag.rsplit('display-name=',1)[1].lower() logging.debug('Username: %s', user) tags = {} tags['badges'] = badges tags['subscriber'] = subscriber tags['msgid'] = msgid tags['user'] = user return tags def get_message(self, resp): """ Transform IRC server message and determine length :param str resp: IRC Server message :return dict msg: Processed message """ msg = {} msg['message'] = resp.rsplit('PRIVMSG #',1)[1].split(':',1)[1].replace('\r\n','') msg['length'] = len(msg['message']) return msg def unpriviledged_commands(self, message, tags): """ Process unpriviledged commands :param dict message: Message :param dict tags: Message metadata (tags) """ msg = message['message'] user = tags['user'] if msg.startswith('#pickme') and self.pick['status'] is True: logging.info('Pickme detected') self.pick['count'] = self.pick['count'] + 1 self.pick['pickme'].append(user) logging.debug("pickme %s added", user) return if msg.startswith('#') and self.quickvote['status'] is True: logging.info('Quickvote: Cast detected') self.quickvote['count'] += 1 self.quickvote['data'][user] = msg.lower() logging.debug("quickvote: %s", self.quickvote) return if msg.startswith('!tts'): logging.info('!tts command detected') self.ttscmd(message, tags) return if msg.startswith('!wiki') and CONF['FEATURE']['WIKI']: logging.debug("!wiki command detected") self.wiki(tags, msg) return if CONF['FEATURE']['QUOTE']: if msg.startswith('!addquote'): logging.debug("!addquote command detected") self.addquote(tags, msg) return if msg.startswith('!smartquote') or msg.startswith('!sq'): logging.debug("!smartquote command detected") self.quote(tags, msg) return def priviledged_commands(self, message, tags): """ Process priviledged commands :param dict message: Message :param dict tags: Message metadata (tags) """ msg = message['message'] badges = tags['badges'] user = tags['user'] if 'broadcaster' in badges or 'moderator' in badges: if msg.startswith('!ping') and CONF['FEATURE']['PING']: logging.debug("Ping check received.") self.sendmsg( CONF['IRC_CHANNEL'], "@"+str(user), "Pong!" ) elif msg.startswith('!version') and CONF['FEATURE']['VERSION']: logging.debug("!version command detected") self.sendmsg( CONF['IRC_CHANNEL'], "@"+str(user), VERSION ) elif msg.startswith('!toff'): logging.info('TTS is now turned off') msg_queue.clear() MSG_QUEUE_RAW.clear() self.tts['status'] = False self.sendmsg( CONF['IRC_CHANNEL'], "@"+str(user), CONF['MESSAGE']['TOFF'] ) elif msg.startswith('!ton'): logging.info('TTS is now turned on') msg_queue.clear() MSG_QUEUE_RAW.clear() self.tts['status'] = True self.sendmsg( CONF['IRC_CHANNEL'], "@"+str(user), CONF['MESSAGE']['TON'] ) elif msg.startswith('!pick') and CONF['FEATURE']['PICK']: logging.debug("!pick command detected") self.pickcmd(msg) elif msg.startswith('!random') and CONF['FEATURE']['RANDOM']: logging.info('!random command detected') self.random(msg) elif msg.startswith('!quickvote') and CONF['FEATURE']['QUOTE']: logging.info("!quickvote command detected") self.quickvotecmd(msg) elif msg.startswith('!ptts'): logging.debug("!ptts command detected") self.ptts(msg) elif msg.startswith('!dtts'): logging.debug("!dtts command detected") self.dtts(msg) elif msg.startswith('!usermap'): logging.info('!usermap command detected') self.usermap(msg) elif msg.startswith('!delay'): logging.info('!delay command detected') self.delay(msg) def check_subonly(self, tags): """ Check if subonly mode is enabled and sender is sub :param dict tags: Message metadata (tags) :return bool: """ if not CONF['IRC_SUBONLY']: return False subscriber = tags['subscriber'] badges = tags['badges'] user = tags['user'] if subscriber != "0" or 'moderator' in badges or 'broadcaster' in badges: logging.info('TTS is sub-only and user has allowance') return False logging.debug('TTS is sub-only') self.sendmsg( CONF['IRC_CHANNEL'], "@"+str(user), CONF['MESSAGE']['SUBONLY'] ) return True def check_modonly(self, tags): """ Check if modonly mode is enabled and sender is mod :param dict tags: Message metadata (tags) :return bool: """ if not CONF['IRC_MODONLY']: return False badges = tags['badges'] user = tags['user'] if 'moderator' in badges or 'broadcaster' in badges: logging.info('TTS is mod-only and user has allowance') return False logging.debug('TTS is mod-only') self.sendmsg( CONF['IRC_CHANNEL'], "@"+str(user), CONF['MESSAGE']['MODONLY'] ) return True def check_tts_disabled(self, user): """ Check if TTS is disabled :param str user: Username :return bool: """ if not self.tts['status']: self.sendmsg( CONF['IRC_CHANNEL'], "@"+str(user), CONF['MESSAGE']['DISABLED'] ) return True logging.debug('TTS is enabled') return False def check_msg_too_long(self, message, user): """ Check if message is too long :param dict message: Message :param str user: Username :return bool: """ if message['length'] > CONF['IRC_TTS_LEN']: self.sendmsg( CONF['IRC_CHANNEL'], "@"+str(user), CONF['MESSAGE']['TOO_LONG'] ) return True logging.debug('Check length: Message is ok') return False def check_user_denied(self, user): """ Check if user is on denied list Check if the given user is on the TTS blacklist :param str user: Username :return bool: """ if user in self.tts['denied']: logging.info("%s is not allowed to use TTS", user) self.sendmsg( CONF['IRC_CHANNEL'], "@"+str(user), CONF['MESSAGE']['DENIED'] ) return True logging.debug("%s is allowed to use TTS", user) return False def check_whitelist(self, user): """ Check Whitelist Checks if the given user is on the TTS whitelist :param str user: Username :return bool: """ if CONF['WHITELIST']: if user not in self.tts_allowed: logging.debug("tts_allowed: %s", self.tts_allowed) self.sendmsg( CONF['IRC_CHANNEL'], "@"+str(user), CONF['MESSAGE']['WHITELISTONLY'] ) return False return True return False def send_tts_msg(self, message, tags): """ Send message to TTS queue :param dict tags: Message metadata (tags) :param str message: IRC message """ logging.info('Valid TTS message, adding to raw queue') tts = True now = datetime.datetime.now() timestamp = str(time.time_ns()) user = tags['user'] msgid = tags['msgid'] badges = tags['badges'] subscriber = tags['subscriber'] msg = message['message'] msglen = message['length'] msg = msg.replace('!tts','',1) msg = { "TTS": tts, "msg": msg, "badges": badges, "subscriber": subscriber, "msgid": msgid, "user": user, "length": msglen, "queuetime": now, "timestamp": timestamp } MSG_QUEUE_RAW.append(msg) def get_response(self): """ Get and process response from IRC """ try: resp = self.irc.recv(2048).decode("UTF-8") logging.debug('resp: %s', resp) except socket.timeout: return except Exception: logging.exception('An unknown error occured while getting a IRC response.') sys.exit(255) if resp.find('PING') != -1: self.resp_ping() if resp.find('CLEARMSG') != -1: self.resp_clearmsg(resp) if resp.find('NOTICE') != -1: self.resp_notice(resp) if resp.find('PRIVMSG') != -1: self.resp_privmsg(resp) def ttscmd(self, msg, tags): """ !tts command Check if message is valid and send it to queue :param str msg: The IRC message triggering the command :param dict tags: The message metadata """ user = tags['user'] if IRC.check_tts_disabled(self, user): logging.info('TTS is disabled') elif IRC.check_msg_too_long(self, msg, user): logging.info('TTS message is too long') elif IRC.check_user_denied(self, user): logging.info('User is not allowed to use TTS') elif IRC.check_subonly(self, tags): logging.info('TTS is sub-only') elif IRC.check_modonly(self, tags): logging.info('TTS is mod-only') elif IRC.check_whitelist(self, user): logging.info('User is not on whitelist') else: logging.info('Sending TTS message to raw_queue') IRC.send_tts_msg(self, msg, tags) def addquote(self, tags, msg): """ !addquote command Adds a newline to quotes.txt :param dict tags: Message metadata (tags) :param str msg: IRC message triggering the command """ user = tags['user'] if IRC.check_user_denied(self, user): logging.info('User is not allowed to use TTS') elif IRC.check_subonly(self, tags): logging.info('TTS is sub-only') else: try: with open("quotes.txt", "rb") as file: nol = len(file.readlines()) file.close() except FileNotFoundError: logging.warning("quotes.txt does not exists, will create") nol = 0 nol = nol + 1 quote = msg.replace("!addquote ", "").strip() quote = quote.split(" ",1) username = quote[0] date = time.strftime("%d.%m.%Y") try: token = CONF['IRC_OAUTH_TOKEN'].replace('oauth:','') login = CONF['IRC_CHANNEL'].replace('#','') api_endpoint = f"https://api.twitch.tv/helix/users?login={login}" headers = { 'Content-type': 'application/x-form-urlencoded', 'Authorization': 'Bearer '+token, 'Client-Id': 'ebo548vs6tq54c9zlrgin2yfzzlrrs' } req = requests.get(url=api_endpoint, headers=headers) data = req.json() user_id = data['data'][0]['id'] api_endpoint = f"https://api.twitch.tv/helix/channels?broadcaster_id={user_id}" headers = { 'Content-type': 'application/x-form-urlencoded', 'Authorization': 'Bearer '+token, 'Client-Id': 'ebo548vs6tq54c9zlrgin2yfzzlrrs' } req = requests.get(url=api_endpoint, headers=headers) data = req.json() game = data['data'][0]['game_name'] quote = f"#{nol}: \"{quote[1]}\" -{username}/{game} ({date})\n" except Exception: logging.warning('Could not get metadata for quote') quote = f"#{nol}: \"{quote[1]}\" -{username} ({date})\n" logging.info('Adding quote %s', quote) with open("quotes.txt", "ab") as file: file.write(quote.encode('utf-8')) message = f"{CONF['MESSAGE']['QUOTE_ADDED_PREFIX']} \ #{nol} {CONF['MESSAGE']['QUOTE_ADDED_SUFFIX']}" raw_msg = { "TTS": True, "msg": message, "badges": True, "subscriber": True, "msgid": True, "user": CONF['IRC_USERNAME'], "length": CONF['IRC_TTS_LEN'], "queuetime": datetime.datetime.now(), "timestamp": str(time.time_ns()) } msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] IRC.sendmsg(self, CONF['IRC_CHANNEL'], "@"+str(user), message) def wiki(self, tags, msg): """ !wiki command Search for wikipedia articles and return the first 3 sentences :param dict tags: Message metadata (tags) :param str msg: IRC message triggering the command """ try: user = tags['user'] wikipedia.set_lang(CONF['WIKI_LANG']) msg = msg.replace('!wiki', '').strip() wikiresult = wikipedia.summary(msg, sentences=3) IRC.sendmsg(self, CONF['IRC_CHANNEL'], "@"+str(user), wikiresult) IRC.sendmsg(self, CONF['IRC_CHANNEL'], "@"+str(user), wikipedia.page(msg).url) message = wikiresult.replace('==', '') raw_msg = { "TTS": True, "msg": message, "badges": True, "subscriber": True, "msgid": True, "user": 'wikipedia', "length": CONF['IRC_TTS_LEN'], "queuetime": datetime.datetime.now(), "timestamp": str(time.time_ns()) } msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] except wikipedia.exceptions.DisambiguationError: user = f"@{user}" IRC.sendmsg(self, CONF['IRC_CHANNEL'], user, CONF['MESSAGE']['WIKI_TOO_MANY']) except Exception: user = f"@{user}" IRC.sendmsg(self, CONF['IRC_CHANNEL'], user, CONF['MESSAGE']['WIKI_NO_RESULT']) def quote(self, tags, msg = False): """ !smartquote command Gets a line from quotes.txt. If a number if given as msg it fetches the given line number. If a string is given it fetches the best matching line. If nothing is given it fetches a random line. :param dict tags: Message metadata (tags) :param str msg: IRC message triggering the command """ try: user = tags['user'] query = msg.replace('!smartquote', '').strip() query = msg.replace('!sq', '').strip() if query.isdigit(): logging.info('Fetching quote #%s', query) file = open("quotes.txt", "rb") quotes = file.readlines() for line in quotes: if line.decode('utf-8').startswith("#"+str(query)+":"): quote = line break file.close() elif query != "": logging.info('Fetching match for %s', query) file = open("quotes.txt", "rb") quotes = file.readlines() matches = process.extract(query, quotes, limit=10) quotes = [] for match, score in matches: if score >= 60: quotes.append(match) logging.debug('Quotes: %s', quotes) if len(quotes) >= 5: quote = random.choice(quotes) else: quote = quotes[0] else: logging.info('Fetching random quote') with open("quotes.txt", "rb") as file: lines = file.read().splitlines() quote = random.choice(lines) except FileNotFoundError: logging.error('"quotes.txt does not exists.') except IndexError: logging.error('Error fetching quote.') if not 'quote' in vars(): logging.info('No quote found.') quote = CONF['MESSAGE']['QUOTE_NOT_FOUND'] IRC.sendmsg(self, CONF['IRC_CHANNEL'], "", quote) return False if not isinstance(quote, str): quote = quote.decode('utf-8') if IRC.check_tts_disabled(self, user): logging.info('TTS is disabled') elif IRC.check_user_denied(self, user): logging.info('User is not allowed to use TTS') elif IRC.check_subonly(self, tags): logging.info('TTS is sub-only') elif IRC.check_modonly(self, tags): logging.info('TTS is mod-only') elif IRC.check_whitelist(self, user): logging.info('User is not on whitelist') else: logging.info('Sending quote to TTS') logging.debug("Quote: %s", quote) IRC.sendmsg(self, CONF['IRC_CHANNEL'], "", quote) message = quote.rsplit('(', 1)[0] raw_msg = { "TTS": True, "msg": message, "badges": True, "subscriber": True, "msgid": True, "user": CONF['IRC_USERNAME'], "length": CONF['IRC_TTS_LEN'], "queuetime": datetime.datetime.now(), "timestamp": str(time.time_ns()) } msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] return True def delay(self, msg): """ !delay command Adjust the delay setting in config.yml :param str msg: The IRC message triggering the command """ try: delay = msg.split(' ')[1] except Exception: delay = False if delay: with open('config.yml','r', encoding='utf-8') as yamlfile: cur_yaml = yaml.safe_load(yamlfile) cur_yaml['irc']['clearmsg_timeout'] = int(delay) if cur_yaml: with open('config.yml','w', encoding='utf-8') as yamlfile: yaml.safe_dump(cur_yaml, yamlfile) load_config() def usermap(self, msg): """ !usermap command Adds a new entry to usermapping in config.yml :param str msg: The IRC message triggering the command """ try: msg = msg.replace('!usermap ', '') splitmsg = msg.split(" ") username, *mappingname = splitmsg mappingname = ' '.join(mappingname) except Exception: username = False mappingname = False if username and mappingname: with open('config.yml','r', encoding='utf-8') as yamlfile: cur_yaml = yaml.safe_load(yamlfile) cur_yaml['usermapping'].update({username: mappingname}) if cur_yaml: with open('config.yml','w', encoding='utf-8') as yamlfile: yaml.safe_dump(cur_yaml, yamlfile) load_config() def pickcmd(self, msg): """ !pick command Pick a number of users who typed #pickme in the chat after the pick command was started. :param str msg: Number of users to pick """ if self.pick['status']: logging.info('Pick stopped') logging.debug("Got %s participats, wanted %s", self.pick['count'], self.pick['number'] ) try: if int(self.pick['count']) > int(self.pick['number']): picks = random.sample(self.pick['pickme'], self.pick['number']) logging.info('Got more than the requested number of participants') else: picks = self.pick['pickme'] logging.info('Got less than or exactly the \ requested number of participants') converted_picks = [str(element) for element in picks] joined_picks = " ".join(converted_picks) except Exception: logging.error("There was an error during picking.") joined_picks = False message = f"{CONF['MESSAGE']['PICKRESULT']} {joined_picks}" if joined_picks: raw_msg = { "TTS": True, "msg": message, "badges": True, "subscriber": True, "msgid": True, "user": CONF['IRC_USERNAME'], "length": CONF['IRC_TTS_LEN'], "queuetime": datetime.datetime.now(), "timestamp": str(time.time_ns()) } msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] IRC.sendmsg(self, CONF['IRC_CHANNEL'], "", CONF['MESSAGE']['PICKRESULT'] ) IRC.sendmsg(self, CONF['IRC_CHANNEL'], "*", joined_picks ) else: IRC.sendmsg(self, CONF['IRC_CHANNEL'], "*", CONF['MESSAGE']['PICKNONE'] ) self.pick['status'] = False self.pick['pickme'] = [] self.pick['count'] = 0 return logging.debug('Pick started') self.pick['status'] = True try: msg = msg.split(' ')[1].strip() self.pick['number'] = msg except IndexError: self.pick['number'] = self.pick['number'] logging.info("Will pick %s participants", self.pick['number']) IRC.sendmsg(self, CONF['IRC_CHANNEL'], "@chat", CONF['MESSAGE']['PICKSTART'] ) return def quickvotecmd(self, msg): """ !quickvote command Starts or stops the !quickvote function. On stop calculates the 5 most casted votes and send them to chat. The highest vote is send to msg_queue. :param str msg: The IRC message triggering the command """ if self.quickvote['status']: logging.debug('Quickvote stopped') if self.quickvote['count'] == 0: logging.info("Nobody voted") IRC.sendmsg( self, CONF['IRC_CHANNEL'], "@chat", CONF['MESSAGE']['VOTEEND'] ) IRC.sendmsg( self, CONF['IRC_CHANNEL'], "*", CONF['MESSAGE']['VOTENOBODY'] ) raw_msg = { "TTS": True, "msg": CONF['MESSAGE']['VOTENOBODY'], "badges": True, "subscriber": True, "msgid": True, "user": CONF['IRC_USERNAME'], "length": CONF['IRC_TTS_LEN'], "queuetime": datetime.datetime.now(), "timestamp": str(time.time_ns()) } msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] logging.info('The result is: %s', CONF['MESSAGE']['VOTENOBODY']) logging.debug('Votemsg: %s', msg) self.quickvote['status'] = False self.quickvote['data'] = {} return logging.info("Counting votes") count = 0 count = Counter(self.quickvote['data'].values()).most_common(5) IRC.sendmsg(self, CONF['IRC_CHANNEL'], "@chat", CONF['MESSAGE']['VOTEEND']) logging.debug(count) count = str(count[0][0].replace('#','')) message = f"{CONF['MESSAGE']['VOTERESULT']} {count}" raw_msg = { "TTS": True, "msg": message , "badges": True, "subscriber": True, "msgid": True, "user": CONF['IRC_USERNAME'], "length": CONF['IRC_TTS_LEN'], "queuetime": datetime.datetime.now(), "timestamp": str(time.time_ns()) } msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] logging.info('The result is: %s', CONF['MESSAGE']['VOTERESULT'] +" "+ str(count[0])) logging.debug('Votemsg: %s', msg) for key, value in count: message = f"{key} ({value}) {CONF['MESSAGE']['VOTES']})" IRC.sendmsg( self, CONF['IRC_CHANNEL'], "*", message ) self.quickvote['status'] = False self.quickvote['data'] = {} self.quickvote['count'] = 0 return logging.debug('Quickvote started') self.quickvote['status'] = True self.quickvote['message'] = msg.split('!quickvote', 1)[1].strip() if self.quickvote['message']: IRC.sendmsg( self, CONF['IRC_CHANNEL'], "@chat", CONF['MESSAGE']['VOTESTART'] + " (" + str(self.quickvote['message']) + ")" ) else: IRC.sendmsg( self, CONF['IRC_CHANNEL'], "@chat", CONF['MESSAGE']['VOTESTART'] ) return def random(self, msg): """ !random command Read a random line from randomfile and put it into msg_queue If no file is given in msg a standard file will be used :param str msg: The IRC message triggering the command :return bool: """ randomfile = msg.replace('!random', '').strip().lower() if randomfile: randomfile = f"random_{os.path.basename(randomfile)}.txt" else: randomfile = "random.txt" try: with open(randomfile,"r", encoding="utf-8") as file: lines = file.read().splitlines() random_msg = random.choice(lines) except FileNotFoundError: logging.error('%s not found', randomfile) return False raw_msg = { "TTS": True, "msg": random_msg, "badges": True, "subscriber": True, "msgid": True, "user": CONF['IRC_USERNAME'], "length": CONF['IRC_TTS_LEN'], "queuetime": datetime.datetime.now(), "timestamp": str(time.time_ns()) } msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] return True def ptts(self, msg): """ !ptts command Add user to tts_allowed list and remove user from tts_denied list :param str msg: The IRC message triggering the command """ user = msg.replace('!ptts', '').strip().lower() if user.startswith('@'): logging.debug('Removing "@" from username') user = user.replace('@', '') logging.info("Adding %s to whitelist", user) self.tts['whitelist'].append(user) if user in self.tts['blacklist']: logging.info("Removing %s from deny list", user) self.tts['blacklist'].remove(user) def dtts(self, msg): """ !dtts command Add user to tts_denied list and remove user from tts_allowed list :param str msg: The IRC message triggering the command """ user = msg.replace('!dtts', '').strip().lower() if user.startswith('@'): logging.debug('Removing "@" from username') user = user.replace('@', '') if user not in self.tts['blacklist']: logging.info("Adding %s to deny list", user) self.tts['blacklist'].append(user) if user in self.tts['whitelist']: logging.info("Removing %s from allowed list", user) self.tts['whitelist'].remove(user) class ThreadingSimpleServer(ThreadingMixIn, HTTPServer): """ Threaded HTTP Server """ class HTTPserv(BaseHTTPRequestHandler): """ Simple HTTP Server """ def log_message(self, format, *args): # pylint: disable=redefined-builtin """ Suppress HTTP log messages """ return def do_GET(self): # pylint: disable=invalid-name """ Process GET requests """ if self.path == '/': self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() with open("tts.html", "rb") as file: html = file.read() self.wfile.write(html) elif self.path == '/favicon.ico': self.send_response(200) self.send_header('Content-type', 'image/x-icon') self.end_headers() with open("favicon.ico", "rb") as file: icon = file.read() self.wfile.write(icon) elif self.path == '/tts.js': self.send_response(200) self.send_header('Content-type', 'text/javascript') self.end_headers() with open("tts.js", "rb") as file: html = file.read() self.wfile.write(html) elif self.path == '/jquery.js': self.send_response(200) self.send_header('Content-type', 'text/javascript') self.end_headers() with open("jquery.js", "rb") as file: html = file.read() self.wfile.write(html) elif self.path == '/bootstrap.min.css': self.send_response(200) self.send_header('Content-type', 'text/css') self.end_headers() with open("bootstrap.min.css", "rb") as file: html = file.read() self.wfile.write(html) elif self.path.startswith('/tts_queue'): tts_json = "" tts = {} self.send_response(200) self.send_header('Content-type', 'text/json') self.end_headers() usermap = CONF['USERMAP'] sorted_tts = {k: msg_queue[k] for k in sorted(msg_queue, reverse=True)} logging.debug(usermap) for key in list(sorted_tts.keys()): if key not in tts_done: if msg_queue[key][0].lower() in usermap: logging.debug('Using usermap for user: %s (%s)', msg_queue[key][0], usermap[msg_queue[key][0].lower()] ) user = usermap[msg_queue[key][0].lower()] tts = {str(key): f"{user} {CONF['MESSAGE']['SAYS']}: {msg_queue[key][1]}"} else: logging.debug('No usermap entry found for user: %s', msg_queue[key][0]) tts = { str(key): f"{msg_queue[key][0]} {CONF['MESSAGE']['SAYS']}: {msg_queue[key][1]}" } tts_json = json.dumps(tts) self.wfile.write(bytes(str(tts_json)+"\n", "utf-8")) elif self.path.startswith('/tts_done'): get_params = parse_qs(self.path) if '/tts_done?id' in get_params: logging.info("Removing message from queue") tts_done.append(get_params['/tts_done?id'][0]) self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(bytes("OK\n", "utf-8")) else: self.send_response(500) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(bytes("Internal Server error\n", "utf-8")) elif self.path.startswith('/token'): data = {} data['client_id'] = "ebo548vs6tq54c9zlrgin2yfzzlrrs" data['response_type'] = "token" data['scope'] = "chat:edit chat:read" if CONF['HTTP_PORT'] == 80: data['redirect_uri'] = "http://localhost/token" elif CONF['HTTP_PORT'] == 8080: data['redirect_uri'] = "http://localhost:8080/token" elif CONF['HTTP_PORT'] == 3000: data['redirect_uri'] = "http://localhost:3000/token" else: self.send_response(500) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(bytes("You can only use this function \ if HTTP_PORT is 80, 8080 or 3000. \ Please change your port, or use \ https://www.21x9.org/twitch instead.\n", "utf-8") ) return False try: url_values = urllib.parse.urlencode(data) url = "https://id.twitch.tv/oauth2/authorize" full_url = url + "?" + url_values data = urllib.request.urlopen(full_url) if data: self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(bytes("\ \ OAuth Token Generator\ \ \
\ \ Click to start the OAuth process.\ \
\ \ \ \n", "utf-8") ) else: self.send_response(500) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(bytes("Could not get OAuth-URL from Twitch\n", "utf-8")) except Exception: logging.error('Could not fetch OAuth-URL from Twitch.') else: self.send_response(404) self.send_header('Server', 'TTS') self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(bytes("File not found.\n", "utf-8")) return True def http_serve_forever(httpd): """ httpd loop """ httpd.serve_forever() def send_tts_queue(): """ Send messages to TTS """ for raw_msg in MSG_QUEUE_RAW: logging.debug('Raw msg: %s', MSG_QUEUE_RAW) now = datetime.datetime.now() if now - raw_msg['queuetime'] > datetime.timedelta(seconds=CONF['IRC_CLEARMSG_TIMEOUT']): logging.debug('clearmsg_timeout reached') if raw_msg['timestamp'] not in msg_queue: logging.info('Sending TTS message') msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] logging.debug("msg_queue: %s", msg_queue) else: logging.debug('Msg is already in queue') def get_url(path=False): """ Generate a valid URL from config values """ if CONF['HTTP_BIND'] == "0.0.0.0": url = "localhost" else: url = CONF['HTTP_BIND'] url = f"http://{url}:{CONF['HTTP_PORT']}/" if path: url = f"{url}{path}" return url def check_oauth_token(): """ Check for valid authentication via Twitch API """ global CONF logging.debug('Checking OAuth Token') try: url = 'https://id.twitch.tv/oauth2/validate' oauth = CONF['IRC_OAUTH_TOKEN'].replace('oauth:','') oauth = f"OAuth {oauth}" request = urllib.request.Request(url) request.add_header('Authorization', oauth) urllib.request.urlopen(request) except HTTPError: logging.fatal('Twitch rejected your OAuth Token. Please check and generate a new one.') logging.info('Please open http://%s:%s/token to generate your OAuth-Token.', CONF['HTTP_BIND'], CONF['HTTP_PORT'] ) url = get_url("token") webbrowser.open_new_tab(url) logging.info('Please complete the OAuth process and add the token into your "config.yml" \ within the next 5 minutes.') time.sleep(300) CONF = load_config() check_oauth_token() logging.info('OAuth Token is valid') return CONF def load_config(): """ Loading config variables """ logging.info("Loading configfile") try: with open("config.yml", "r", encoding="UTF-8") as ymlfile: cfg = yaml.load(ymlfile, Loader=yaml.Loader) except FileNotFoundError: logging.fatal('Your config file is missing, please copy config-dist.yml \ to config.yml and review your settings.') sys.exit(253) for section in cfg: try: logging.debug('Fetching config: %s', section) CONF['IRC_CHANNEL'] = cfg.get('irc', {}).get( 'channel', False ) CONF['IRC_USERNAME'] = cfg.get('irc', {}).get( 'username', False ) CONF['IRC_OAUTH_TOKEN'] = cfg.get('irc', {}).get( 'oauth_token', False ) CONF['IRC_SERVER'] = cfg.get('irc', {}).get( 'server', "irc.chat.twitch.tv" ) CONF['IRC_CLEARMSG_TIMEOUT'] = cfg.get( 'irc', {}).get('clearmsg_timeout', 60 ) CONF['IRC_SUBONLY'] = cfg.get('bot', {}).get( 'subonly', False ) CONF['IRC_MODONLY'] = cfg.get('bot', {}).get( 'modonly', False ) CONF['IRC_TTS_LEN'] = cfg.get('bot', {}).get( 'message_length', 200 ) CONF['TTS_STARTENABLED'] = cfg.get('bot', {}).get( 'start_enabled', True ) CONF['WIKI_LANG'] = cfg.get('bot', {}).get( 'language', 'en' ) CONF['LOG_LEVEL'] = cfg.get('log', {}).get( 'level', "INFO" ) CONF['HTTP_PORT'] = cfg.get('http', {}).get( 'port', 80 ) CONF['HTTP_BIND'] = cfg.get('http', {}).get( 'bind', "localhost" ) CONF['MESSAGE'] = {} CONF['MESSAGE']['TOFF'] = cfg.get('messages', {}).get( 'toff', "TTS is now disabled." ) CONF['MESSAGE']['TON'] = cfg.get('messages', {}).get( 'ton', "TTS is now active." ) CONF['MESSAGE']['TOO_LONG'] = cfg.get('messages', {}).get( 'too_long', "Sorry, your message is too long." ) CONF['MESSAGE']['DISABLED'] = cfg.get('messages', {}).get( 'disabled', "Sorry, TTS is disabled." ) CONF['MESSAGE']['DENIED'] = cfg.get('messages', {}).get( 'denied', "Sorry, you're not allowed to use TTS." ) CONF['MESSAGE']['SUBONLY'] = cfg.get('messages', {}).get( 'subonly', "Sorry, TTS is sub-only." ) CONF['MESSAGE']['MODONLY'] = cfg.get('messages', {}).get( 'modonly', "Sorry, TTS is mod-only." ) CONF['MESSAGE']['READY'] = cfg.get('messages', {}).get( 'ready', "TTS bot is ready." ) CONF['MESSAGE']['WHITELISTONLY'] = cfg.get('messages', {}).get( 'whitelist', False ) CONF['MESSAGE']['SAYS'] = cfg.get('messages', {}).get( 'says', "says" ) CONF['MESSAGE']['VOTESTART'] = cfg.get('messages', {}).get( 'votestart', "Quickvote started. Send #yourchoice to participate." ) CONF['MESSAGE']['VOTEEND'] = cfg.get('messages', {}).get( 'voteend', "Quickvote ended. The results are:" ) CONF['MESSAGE']['VOTENOBODY'] = cfg.get('messages', {}).get( 'votenobody', "Nobody casted a vote. :(" ) CONF['MESSAGE']['VOTERESULT'] = cfg.get('messages', {}).get( 'voteresult', "Voting has ended. The result is:" ) CONF['MESSAGE']['VOTES'] = cfg.get('messages', {}).get( 'votes', "Votes" ) CONF['MESSAGE']['PICKSTART'] = cfg.get('messages', {}).get( 'pickstart', "Pick started. Send #pickme to participate." ) CONF['MESSAGE']['PICKRESULT'] = cfg.get('messages', {}).get( 'pickresult', "Pick ended. The results are:" ) CONF['MESSAGE']['PICKNONE'] = cfg.get('messages', {}).get( 'picknone', "Pick ended. Nobody was picked." ) CONF['MESSAGE']['QUOTE_NOT_FOUND'] = cfg.get('messages', {}).get( 'quotenotfound', "Sorry, no quote found." ) CONF['MESSAGE']['QUOTE_ADDED_PREFIX'] = cfg.get('messages', {}).get( 'quoteaddedprefix', "Quote:" ) CONF['MESSAGE']['QUOTE_ADDED_SUFFIX'] = cfg.get('messages', {}).get( 'quoteaddedsuffix', "added." ) CONF['MESSAGE']['WIKI_TOO_MANY'] = cfg.get('messages', {}).get( 'wiki_too_many', "Sorry, there are too many possible results. \ Try a more narrow search." ) CONF['MESSAGE']['WIKI_NO_RESULT'] = cfg.get('messages', {}).get( 'wiki_no_result', "Sorry, there was an error fetching the wikipedia answer." ) CONF['FEATURE'] = {} CONF['FEATURE']['WIKI'] = cfg.get('features', {}).get( 'wiki', True ) CONF['FEATURE']['PICK'] = cfg.get('features', {}).get( 'pick', True ) CONF['FEATURE']['VOTE'] = cfg.get('features', {}).get( 'vote', True ) CONF['FEATURE']['QUOTE'] = cfg.get('features', {}).get( 'quote', True ) CONF['FEATURE']['RANDOM'] = cfg.get('features', {}).get( 'random', True ) CONF['FEATURE']['VERSION'] = cfg.get('features', {}).get( 'version', True ) CONF['FEATURE']['PING'] = cfg.get('features', {}).get( 'ping', True ) CONF['USERMAP'] = cfg.get('usermapping', []) if 'whitelist' in cfg: CONF['WHITELIST'] = True CONF['WHITELIST_USER'] = cfg['whitelist'] else: CONF['WHITELIST'] = False except KeyError: logging.exception('Your config file is invalid, please check and try again.') sys.exit(254) if CONF['WHITELIST']: logging.info('Whitelist mode enabled') logging.debug('Whitelist: %s', CONF['WHITELIST_USER']) if not CONF['IRC_CHANNEL']: raise ValueError('Please add your twitch channel to config.yml.') if not CONF['IRC_USERNAME']: raise ValueError('Please add the bots username to config.yml.') if not CONF['IRC_OAUTH_TOKEN']: CONF['IRC_OAUTH_TOKEN'] = "Invalid" return CONF if not CONF['IRC_OAUTH_TOKEN'].startswith('oauth:'): raise ValueError('Your oauth-token is invalid, it has to start with: "oauth:"') return CONF def main(): """ Main loop """ global CONF CONF = load_config() lastreload = datetime.datetime.now() logging.getLogger().setLevel(CONF['LOG_LEVEL']) if CONF['LOG_LEVEL'] == 'DEBUG': sys.tracebacklimit = 5 logging.info("Starting Webserver") httpd = ThreadingSimpleServer( (CONF['HTTP_BIND'], CONF['HTTP_PORT']), HTTPserv ) http_thread = Thread(target=http_serve_forever, daemon=True, args=(httpd, )) http_thread.start() check_oauth_token() logging.info("Starting IRC bot") irc = IRC() irc.connect( CONF['IRC_SERVER'], 6667, CONF['IRC_CHANNEL'], CONF['IRC_USERNAME'], CONF['IRC_OAUTH_TOKEN'] ) irc.sendmsg( CONF['IRC_CHANNEL'], 'MrDestructoid', CONF['MESSAGE']['READY'] ) logging.info('Connected and joined') url = get_url() logging.info("Please open your browser and visit: %s", url) webbrowser.open_new_tab(url) while True: if CONF['LOG_LEVEL'] == "DEBUG": time.sleep(1) try: irc.get_response() confreload = datetime.datetime.now() if confreload - lastreload > datetime.timedelta(seconds=60): CONF = load_config() lastreload = datetime.datetime.now() if irc.quickvote['status'] and irc.quickvote['message']: logging.info('Quickvote is active') irc.sendmsg( CONF['IRC_CHANNEL'], "@chat", f"{CONF['MESSAGE']['VOTESTART']} ({irc.quickvote['message']})" ) if not irc.tts['status']: continue logging.debug('MSG_QUEUE_RAW: %s', MSG_QUEUE_RAW) send_tts_queue() except KeyboardInterrupt: httpd.shutdown() logging.info('Exiting...') os.kill(os.getpid(), signal.SIGTERM) if __name__ == "__main__": logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(module)s %(threadName)s %(levelname)s: %(message)s' ) sys.tracebacklimit = 3 VERSION = "1.7.1" CONF = {} tts_done = [] MSG_QUEUE_RAW = [] msg_queue = {} if sys.argv[1:]: if sys.argv[1] == "--version": print('Simple TTS Bot') print('Version %s', VERSION) sys.exit(1) main()