#!/usr/bin/python3 # -*- coding: utf-8 -*- # pylint: disable=line-too-long """ TwitchTTS Copyright (C) 2022 gpkvt This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ import json import logging import socket import sys import time import datetime import socketserver import urllib.request import urllib.parse from threading import Thread from http.server import BaseHTTPRequestHandler from urllib.parse import parse_qs from collections import Counter import yaml class IRC: """IRC bot""" irc = socket.socket() def __init__(self): self.irc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.tts_denied = [] self.tts_allowed = [] self.tts_status = conf['TTS_STARTENABLED'] self.quickvote = False self.votemsg = False self.poll = {} self.pollcount = 0 if 'WHITELIST_USER' in conf: self.tts_allowed = conf['WHITELIST_USER'] def connect(self, server, port, channel, botnick, botpass): """Connect to Twitch IRC servers""" logging.info("Connecting to: %s", server) try: self.irc.connect((server, port)) except ConnectionResetError: logging.fatal('Twitch refused to connect, please check your settings and try again.') sys.exit(252) self.irc.settimeout(1) self.irc.send(bytes("PASS " + botpass + "\r\n", "UTF-8")) self.irc.send(bytes( "USER " + botnick + " " + botnick +" " + botnick + " :python\r\n", "UTF-8") ) self.irc.send(bytes("NICK " + botnick + "\r\n", "UTF-8")) self.irc.send(bytes("CAP REQ :twitch.tv/commands twitch.tv/tags \r\n", "UTF-8")) time.sleep(5) try: self.irc.send(bytes("JOIN " + channel + "\r\n", "UTF-8")) except ConnectionResetError: logging.warning('JOIN was refused, will try again in 5 seconds.') time.sleep(5) logging.warning('Please check your credentials, if this error persists.') self.irc.send(bytes("JOIN " + channel + "\r\n", "UTF-8")) def sendmsg(self, channel, user, msg): """ Send (a public) message to IRC channel Parameters: channel (str): Channel to post msg to user (str): User to address message to msg (str): Message to send """ self.irc.send(bytes("PRIVMSG "+channel+" :"+user+" "+msg+"\r\n", "UTF-8")) def get_response(self): """Get and process response from IRC""" try: resp = self.irc.recv(2048).decode("UTF-8") logging.debug('resp:') logging.debug(resp) except socket.timeout: return False except Exception: # pylint: disable=broad-except logging.exception('An unknown error occured while getting a IRC response.') sys.exit(255) if resp.find('PING') != -1: logging.debug('PING received') self.irc.send(bytes('PONG :tmi.twitch.tv\r\n', "UTF-8")) if resp.find('CLEARMSG') != -1: logging.info('CLEARMSG received') msgid = False global msg_queue_raw # pylint: disable=global-statement,invalid-name filtered_msg_queue = [] tags = resp.split(';') for tag in tags: if "target-msg-id=" in tag: msgid = tag.rsplit('target-msg-id=',1)[1] logging.debug('Trying to suppress message') logging.debug(msgid) for msg in list(msg_queue_raw): if msg['msgid'] == msgid: logging.info('Suppressing message %s', msgid) else: filtered_msg_queue.append(msg) msg_queue_raw = filtered_msg_queue return True if resp.find('NOTICE') != -1: if 'Login authentication failed' in resp: try: raise RuntimeError() except RuntimeError: logging.exception('Login failed, please check your credentials and try again.') sys.exit(251) if resp.find('PRIVMSG') != -1: logging.debug('PRIVMSG received') badges = False subscriber = False msgid = False msg = False msglen = False user = False tts = False tags = resp.split(';') for tag in tags: if tag.startswith('badges='): badges = tag.rsplit('badges=',1)[1] if tag.startswith('subscriber='): subscriber = tag.rsplit('subscriber=',1)[1] logging.debug('Subscriber: %s', subscriber) if tag.startswith('id='): msgid = tag.rsplit('id=',1)[1] logging.debug('Message ID: %s', msgid) if tag.startswith('display-name='): user = tag.rsplit('display-name=',1)[1].lower() logging.debug('Username: %s', user) msg = resp.rsplit('PRIVMSG #',1)[1] msg = msg.split(':',1)[1] msg = msg.replace('\r\n','') msglen = len(msg) logging.debug('Msg: %s', msg) logging.debug('Msg length: %s', msglen) logging.debug('Deny List:') logging.debug(self.tts_denied) if 'broadcaster' in badges or 'moderator' in badges: if msg.startswith('!dtts'): logging.debug("!dtts command detected") user = msg.replace('!dtts', '').strip().lower() if user.startswith('@'): logging.debug('Removing "@" from username') user = user.replace('@', '') if user not in self.tts_denied: logging.info("Adding %s to deny list", user) self.tts_denied.append(user) if user in self.tts_allowed: logging.info("Removing %s from allowed list", user) self.tts_allowed.remove(user) return True if msg.startswith('!ping'): logging.debug("Ping check received.") self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), "Pong!") return True if msg.startswith('!quickvote'): logging.info("!quickvote command detected") if self.quickvote: logging.debug('Quickvote stopped') if self.pollcount == 0: logging.info("Nobody voted") self.sendmsg(conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTEEND']) self.sendmsg(conf['IRC_CHANNEL'], "*", conf['MESSAGE']['VOTENOBODY']) self.quickvote = False self.poll = {} return False logging.info("Counting votes") count = 0 count = Counter(self.poll.values()).most_common(5) self.sendmsg(conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTEEND']) logging.debug(count) for key, value in count: self.sendmsg( conf['IRC_CHANNEL'], "*", str(key)+" ("+str(value)+ " "+ conf['MESSAGE']['VOTES'] + ")" ) self.quickvote = False self.poll = {} self.pollcount = 0 return True else: logging.debug('Quickvote started') self.quickvote = True self.votemsg = resp.split('!quickvote', 1)[1].strip() if self.votemsg: self.sendmsg( conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTESTART'] + " (" + str(self.votemsg) + ")" ) else: self.sendmsg(conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTESTART']) return True if msg.startswith('!ptts'): logging.debug("!ptts command detected") user = msg.replace('!ptts', '').strip().lower() if user.startswith('@'): logging.debug('Removing "@" from username') user = user.replace('@', '') logging.info("Adding %s to whitelist", user) self.tts_allowed.append(user) if user in self.tts_denied: logging.info("Removing %s from deny list", user) self.tts_denied.remove(user) return True if msg.startswith('#') and self.quickvote is True: logging.info('Quickvote: Cast detected') self.pollcount += 1 self.poll[user] = msg.lower() logging.debug(self.poll) if msg.startswith('!toff'): logging.info('TTS is now turned off') msg_queue.clear() msg_queue_raw.clear() self.tts_status = False self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['TOFF']) return True if msg.startswith('!ton'): logging.info('TTS is now turned on') msg_queue.clear() msg_queue_raw.clear() self.tts_status = True self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['TON']) return True if msg.startswith('!tts'): logging.debug('!tts command detected') if msglen > conf['IRC_TTS_LEN']: logging.info('TTS message is to long') self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['TOO_LONG']) return False logging.debug("tts status: %s", self.tts_status) logging.debug(conf['TTS_STARTENABLED']) if not self.tts_status: logging.info('TTS is disabled') self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['DISABLED']) return False if user in self.tts_denied: logging.info("%s is not allowed to use TTS", user) self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['DENIED']) return False if conf['IRC_SUBONLY']: if subscriber != "0" or 'moderator' in badges or 'broadcaster' in badges: logging.debug('TTS is sub-only and user has allowance') else: logging.info('TTS is sub-only') self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['SUBONLY']) return False if conf['IRC_MODONLY']: if 'moderator' in badges or 'broadcaster' in badges: logging.debug('TTS is mod-only and user has allowance') else: logging.info('TTS is sub-only') self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['MODONLY']) return False if conf['WHITELIST']: if user not in self.tts_allowed: logging.info('User is not on whitelist') logging.info(self.tts_allowed) self.sendmsg( conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['WHITELISTONLY'] ) return False else: logging.info('Nobody is on the whitelist.') self.sendmsg( conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['WHITELISTONLY'] ) return False logging.info('Valid TTS message, adding to raw queue') tts = True now = datetime.datetime.now() msg = msg.replace('!tts','',1) msg = { "TTS": tts, "msg": msg, "badges": badges, "subscriber": subscriber, "msgid": msgid, "user": user, "length": msglen, "queuetime": now, "timestamp": str(time.time_ns()) } msg_queue_raw.append(msg) return True return False class HTTPserv(BaseHTTPRequestHandler): """Simple HTTP Server""" def log_message(self, format, *args): # pylint: disable=redefined-builtin """Suppress HTTP log messages""" return def do_GET(self): # pylint: disable=invalid-name """Process GET requests""" if self.path == '/': self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() with open("tts.html", "rb") as file: html = file.read() self.wfile.write(html) elif self.path == '/favicon.ico': self.send_response(200) self.send_header('Content-type', 'image/x-icon') self.end_headers() with open("favicon.ico", "rb") as file: icon = file.read() self.wfile.write(icon) elif self.path == '/tts.js': self.send_response(200) self.send_header('Content-type', 'text/javascript') self.end_headers() with open("tts.js", "rb") as file: html = file.read() self.wfile.write(html) elif self.path == '/jquery.js': self.send_response(200) self.send_header('Content-type', 'text/javascript') self.end_headers() with open("jquery.js", "rb") as file: html = file.read() self.wfile.write(html) elif self.path == '/bootstrap.min.css': self.send_response(200) self.send_header('Content-type', 'text/css') self.end_headers() with open("bootstrap.min.css", "rb") as file: html = file.read() self.wfile.write(html) elif self.path.startswith('/tts_queue'): tts_json = "" tts = {} self.send_response(200) self.send_header('Content-type', 'text/json') self.end_headers() usermap = conf['USERMAP'] sorted_tts = {k: msg_queue[k] for k in sorted(msg_queue, reverse=False)} for key in list(sorted_tts.keys()): if key not in tts_done: if msg_queue[key][0].lower() in usermap: tts = {str(key): str(usermap[msg_queue[key][0].lower()]) + " " + str(conf['MESSAGE']['SAYS']) + ":" + str(msg_queue[key][1])} else: tts = {str(key): str(msg_queue[key][0]) + " " + str(conf['MESSAGE']['SAYS']) + ":" + str(msg_queue[key][1])} tts_json = json.dumps(tts) self.wfile.write(bytes(str(tts_json)+"\n", "utf-8")) elif self.path.startswith('/tts_done'): get_params = parse_qs(self.path) if '/tts_done?id' in get_params: logging.info("Removing message from queue") tts_done.append(get_params['/tts_done?id'][0]) self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(bytes("OK\n", "utf-8")) else: self.send_response(500) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(bytes("Internal Server error\n", "utf-8")) elif self.path.startswith('/token') and conf['IRC_OAUTH_TOKEN'] == "Invalid": data = {} data['client_id'] = "ebo548vs6tq54c9zlrgin2yfzzlrrs" data['response_type'] = "token" data['scope'] = "chat:edit chat:read" if conf['HTTP_PORT'] == 80: data['redirect_uri'] = "http://localhost/token" elif conf['HTTP_PORT'] == 8080: data['redirect_uri'] = "http://localhost:8080/token" elif conf['HTTP_PORT'] == 3000: data['redirect_uri'] = "http://localhost:3000/token" else: self.send_response(500) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(bytes("You can only use this function if HTTP_PORT is 80, 8080 or 3000. Please change your port, or use https://www.21x9.org/twitch instead.\n", "utf-8")) return False try: url_values = urllib.parse.urlencode(data) url = "https://id.twitch.tv/oauth2/authorize" full_url = url + "?" + url_values data = urllib.request.urlopen(full_url) if data: self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(bytes("OAuth Token Generator
Click to start the OAuth process.
\n", "utf-8")) else: self.send_response(500) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(bytes("Could not get OAuth-URL from Twitch\n", "utf-8")) except Exception: # pylint: disable=broad-except logging.error('Could not fetch OAuth-URL from Twitch.') else: self.send_response(404) self.send_header('Server', 'TTS') self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(bytes("File not found.\n", "utf-8")) return def http_serve_forever(httpd): """httpd loop""" httpd.serve_forever() def load_config(): """Loading config variables""" logging.info("Loading configfile") try: with open("config.yml", "r", encoding="UTF-8") as ymlfile: cfg = yaml.load(ymlfile, Loader=yaml.Loader) except FileNotFoundError: logging.fatal('Your config file is missing, please copy config-dist.yml to config.yml and review your settings.') sys.exit(253) for section in cfg: try: logging.debug('Fetching config: %s', section) conf['IRC_CHANNEL'] = cfg['irc']['channel'] conf['IRC_USERNAME'] = cfg['irc']['username'] conf['IRC_OAUTH_TOKEN'] = cfg['irc']['oauth_token'] conf['IRC_SERVER'] = cfg['irc']['server'] or "irc.chat.twitch.tv" conf['IRC_CLEARMSG_TIMEOUT'] = cfg['irc']['clearmsg_timeout'] or 60 conf['IRC_SUBONLY'] = cfg['bot']['subonly'] or False conf['IRC_MODONLY'] = cfg['bot']['modonly'] or False conf['IRC_TTS_LEN'] = cfg['bot']['message_length'] or 200 conf['TTS_STARTENABLED'] = cfg['bot']['start_enabled'] or False conf['LOG_LEVEL'] = cfg['log']['level'] or "INFO" conf['HTTP_PORT'] = cfg['http']['port'] or 80 conf['HTTP_BIND'] = cfg['http']['bind'] or "localhost" conf['MESSAGE'] = {} conf['MESSAGE']['TOFF'] = cfg['messages']['toff'] or "TTS is now disabled." conf['MESSAGE']['TON'] = cfg['messages']['ton'] or "TTS is now active." conf['MESSAGE']['TOO_LONG'] = cfg['messages']['too_long'] or "Sorry, your message is too long." conf['MESSAGE']['DISABLED'] = cfg['messages']['disabled'] or "Sorry, TTS is disabled." conf['MESSAGE']['DENIED'] = cfg['messages']['denied'] or "Sorry, you're not allowed to use TTS." conf['MESSAGE']['SUBONLY'] = cfg['messages']['subonly'] or "Sorry, TTS is sub-only." conf['MESSAGE']['MODONLY'] = cfg['messages']['modonly'] or "Sorry, TTS is mod-only." conf['MESSAGE']['READY'] = cfg['messages']['ready'] or "TTS bot is ready." conf['MESSAGE']['WHITELISTONLY'] = cfg['messages']['whitelist'] or False conf['MESSAGE']['SAYS'] = cfg['messages']['says'] or "says" conf['MESSAGE']['VOTESTART'] = cfg['messages']['votestart'] or "Quickvote started. Send #yourchoice to participate." conf['MESSAGE']['VOTEEND'] = cfg['messages']['voteend'] or "Quickvote ended. The results are:" conf['MESSAGE']['VOTENOBODY'] = cfg['messages']['votenobody'] or "Nobody casted a vote. :(" conf['MESSAGE']['VOTES'] = cfg['messages']['votes'] or "Stimmen" conf['USERMAP'] = cfg['usermapping'] or [] if 'whitelist' in cfg: conf['WHITELIST'] = True conf['WHITELIST_USER'] = cfg['whitelist'] else: conf['WHITELIST'] = False except KeyError: logging.exception('Your config file is invalid, please check and try again.') sys.exit(254) if conf['WHITELIST']: logging.info('Whitelist mode enabled') logging.debug('Whitelist:') logging.debug(conf['WHITELIST_USER']) if not conf['IRC_CHANNEL']: raise ValueError('Please add your twitch channel to config.yml.') if not conf['IRC_USERNAME']: raise ValueError('Please add the bots username to config.yml.') if not conf['IRC_OAUTH_TOKEN']: conf['IRC_OAUTH_TOKEN'] = "Invalid" return conf if not conf['IRC_OAUTH_TOKEN'].startswith('oauth:'): raise ValueError('Your oauth-token is invalid, it has to start with: "oauth:"') return conf conf = {} tts_done = [] msg_queue_raw = [] msg_queue = {} logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(module)s %(threadName)s %(levelname)s: %(message)s') sys.tracebacklimit = 0 def main(): """Main loop""" global conf # pylint: disable=global-statement,invalid-name conf = load_config() lastreload = datetime.datetime.now() logging.getLogger().setLevel(conf['LOG_LEVEL']) if conf['LOG_LEVEL'] == 'DEBUG': sys.tracebacklimit = 5 logging.info("Starting Webserver") httpd = socketserver.TCPServer((conf['HTTP_BIND'], conf['HTTP_PORT']), HTTPserv) httpd.allow_reuse_port = True httpd.allow_reuse_address = True http_thread = Thread(target=http_serve_forever, daemon=True, args=(httpd, )) http_thread.start() if conf['IRC_OAUTH_TOKEN'] == "Invalid": logging.error('No OAuth Token, skipping start of IRC bot.') while True: logging.error('Please open http://%s:%s/token to generate your OAuth-Token.', conf['HTTP_BIND'], conf['HTTP_PORT']) time.sleep(10) else: logging.info("Starting IRC bot") irc = IRC() irc.connect(conf['IRC_SERVER'], 6667, conf['IRC_CHANNEL'], conf['IRC_USERNAME'], conf['IRC_OAUTH_TOKEN']) irc.sendmsg(conf['IRC_CHANNEL'], 'MrDestructoid', conf['MESSAGE']['READY']) logging.info("Please open your browser and visit: http://%s:%s/", conf['HTTP_BIND'], conf['HTTP_PORT']) while True: if conf['LOG_LEVEL'] == "DEBUG": time.sleep(1) try: irc.get_response() if not irc.tts_status: logging.debug("TTS is disabled") if conf['LOG_LEVEL'] == "DEBUG": time.sleep(1) continue confreload = datetime.datetime.now() if confreload - lastreload > datetime.timedelta(seconds=60): conf = load_config() lastreload = datetime.datetime.now() if irc.quickvote and irc.votemsg: logging.info('Quickvote is active') irc.sendmsg(conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTESTART'] + " (" + str(irc.votemsg) + ")") logging.debug('Raw message queue:') logging.debug(msg_queue_raw) for raw_msg in msg_queue_raw: logging.debug('Raw msg:') logging.debug(msg_queue_raw) now = datetime.datetime.now() if now - raw_msg['queuetime'] > datetime.timedelta(seconds=conf['IRC_CLEARMSG_TIMEOUT']): logging.debug('clearmsg_timeout reached') if raw_msg['timestamp'] not in msg_queue: logging.info('Sending TTS message') msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] logging.debug(msg_queue) else: logging.debug('Msg is already in queue') except KeyboardInterrupt: logging.info('Exiting...') sys.exit() if __name__ == "__main__": main()