#!/usr/bin/python3 # -*- coding: utf-8 -*- # pylint: disable=line-too-long """ TwitchTTS Copyright (C) 2022 gpkvt This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ import os import sys import time import json import socket import random import logging import datetime import webbrowser import socketserver import urllib.parse import urllib.request from threading import Thread from collections import Counter from urllib.parse import parse_qs from http.server import BaseHTTPRequestHandler import yaml class IRC: """IRC bot""" irc = socket.socket() def __init__(self): self.irc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.tts_denied = [] self.tts_allowed = [] self.tts_status = conf['TTS_STARTENABLED'] self.quickvote = False self.votemsg = False self.poll = {} self.pollcount = 0 if 'WHITELIST_USER' in conf: self.tts_allowed = conf['WHITELIST_USER'] def connect(self, server, port, channel, botnick, botpass): """Connect to Twitch IRC servers""" logging.info("Connecting to: %s", server) try: self.irc.connect((server, port)) except ConnectionResetError: logging.fatal('Twitch refused to connect, please check your settings and try again.') sys.exit(252) self.irc.settimeout(1) self.irc.send(bytes("PASS " + botpass + "\r\n", "UTF-8")) self.irc.send(bytes( "USER " + botnick + " " + botnick +" " + botnick + " :python\r\n", "UTF-8") ) self.irc.send(bytes("NICK " + botnick + "\r\n", "UTF-8")) self.irc.send(bytes("CAP REQ :twitch.tv/commands twitch.tv/tags \r\n", "UTF-8")) time.sleep(5) try: self.irc.send(bytes("JOIN " + channel + "\r\n", "UTF-8")) except ConnectionResetError: logging.warning('JOIN was refused, will try again in 30 seconds.') time.sleep(30) logging.warning('Please check your credentials, if this error persists.') self.irc.send(bytes("JOIN " + channel + "\r\n", "UTF-8")) def sendmsg(self, channel, user, msg): """ Send (a public) message to IRC channel Parameters: channel (str): Channel to post msg to user (str): User to address message to msg (str): Message to send """ self.irc.send(bytes("PRIVMSG "+channel+" :"+user+" "+msg+"\r\n", "UTF-8")) def get_response(self): """Get and process response from IRC""" try: resp = self.irc.recv(2048).decode("UTF-8") logging.debug('resp:') logging.debug(resp) except socket.timeout: return False except Exception: # pylint: disable=broad-except logging.exception('An unknown error occured while getting a IRC response.') sys.exit(255) if resp.find('PING') != -1: logging.debug('PING received') self.irc.send(bytes('PONG :tmi.twitch.tv\r\n', "UTF-8")) if resp.find('CLEARMSG') != -1: logging.info('CLEARMSG received') msgid = False global msg_queue_raw # pylint: disable=global-statement,invalid-name filtered_msg_queue = [] tags = resp.split(';') for tag in tags: if "target-msg-id=" in tag: msgid = tag.rsplit('target-msg-id=',1)[1] logging.debug('Trying to suppress message') logging.debug(msgid) for msg in list(msg_queue_raw): if msg['msgid'] == msgid: logging.info('Suppressing message %s', msgid) else: filtered_msg_queue.append(msg) msg_queue_raw = filtered_msg_queue return True if resp.find('NOTICE') != -1: if 'Login authentication failed' in resp: try: raise RuntimeError() except RuntimeError: logging.exception('Login failed, please check your credentials and try again.') sys.exit(251) if resp.find('PRIVMSG') != -1: logging.debug('PRIVMSG received') badges = False subscriber = False msgid = False msg = False msglen = False user = False tts = False tags = resp.split(';') for tag in tags: if tag.startswith('badges='): badges = tag.rsplit('badges=',1)[1] if tag.startswith('subscriber='): subscriber = tag.rsplit('subscriber=',1)[1] logging.debug('Subscriber: %s', subscriber) if tag.startswith('id='): msgid = tag.rsplit('id=',1)[1] logging.debug('Message ID: %s', msgid) if tag.startswith('display-name='): user = tag.rsplit('display-name=',1)[1].lower() logging.debug('Username: %s', user) msg = resp.rsplit('PRIVMSG #',1)[1] msg = msg.split(':',1)[1] msg = msg.replace('\r\n','') msglen = len(msg) logging.debug('Msg: %s', msg) logging.debug('Msg length: %s', msglen) logging.debug('Deny List:') logging.debug(self.tts_denied) if 'broadcaster' in badges or 'moderator' in badges: if msg.startswith('!dtts'): logging.debug("!dtts command detected") user = msg.replace('!dtts', '').strip().lower() if user.startswith('@'): logging.debug('Removing "@" from username') user = user.replace('@', '') if user not in self.tts_denied: logging.info("Adding %s to deny list", user) self.tts_denied.append(user) if user in self.tts_allowed: logging.info("Removing %s from allowed list", user) self.tts_allowed.remove(user) return True if msg.startswith('!ping'): logging.debug("Ping check received.") self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), "Pong!") return True if msg.startswith('!random'): logging.info('!random command detected') randomfile = msg.replace('!random', '').strip().lower() if randomfile: randomfile = "random_"+str(os.path.basename(randomfile))+".txt" else: randomfile = "random.txt" try: with open(randomfile,"r", encoding="utf-8") as file: lines = file.read().splitlines() random_msg = random.choice(lines) except FileNotFoundError: logging.error('%s not found', randomfile) return False raw_msg = { "TTS": True, "msg": random_msg, "badges": True, "subscriber": True, "msgid": True, "user": conf['IRC_USERNAME'], "length": conf['IRC_TTS_LEN'], "queuetime": datetime.datetime.now(), "timestamp": str(time.time_ns()) } msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] return True if msg.startswith('!quickvote'): logging.info("!quickvote command detected") if self.quickvote: logging.debug('Quickvote stopped') if self.pollcount == 0: logging.info("Nobody voted") self.sendmsg(conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTEEND']) self.sendmsg(conf['IRC_CHANNEL'], "*", conf['MESSAGE']['VOTENOBODY']) raw_msg = { "TTS": True, "msg": conf['MESSAGE']['VOTENOBODY'], "badges": True, "subscriber": True, "msgid": True, "user": conf['IRC_USERNAME'], "length": conf['IRC_TTS_LEN'], "queuetime": datetime.datetime.now(), "timestamp": str(time.time_ns()) } msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] logging.info('The result is: %s', conf['MESSAGE']['VOTENOBODY']) logging.debug('Votemsg: %s', msg) self.quickvote = False self.poll = {} return False logging.info("Counting votes") count = 0 count = Counter(self.poll.values()).most_common(5) self.sendmsg(conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTEEND']) logging.debug(count) raw_msg = { "TTS": True, "msg": conf['MESSAGE']['VOTERESULT'] +" "+ str(count[0][0].replace('#','')), "badges": True, "subscriber": True, "msgid": True, "user": conf['IRC_USERNAME'], "length": conf['IRC_TTS_LEN'], "queuetime": datetime.datetime.now(), "timestamp": str(time.time_ns()) } msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] logging.info('The result is: %s', conf['MESSAGE']['VOTERESULT'] +" "+ str(count[0])) logging.debug('Votemsg: %s', msg) for key, value in count: self.sendmsg( conf['IRC_CHANNEL'], "*", str(key)+" ("+str(value)+ " "+ conf['MESSAGE']['VOTES'] + ")" ) self.quickvote = False self.poll = {} self.pollcount = 0 return True else: logging.debug('Quickvote started') self.quickvote = True self.votemsg = resp.split('!quickvote', 1)[1].strip() if self.votemsg: self.sendmsg( conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTESTART'] + " (" + str(self.votemsg) + ")" ) else: self.sendmsg(conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTESTART']) return True if msg.startswith('!ptts'): logging.debug("!ptts command detected") user = msg.replace('!ptts', '').strip().lower() if user.startswith('@'): logging.debug('Removing "@" from username') user = user.replace('@', '') logging.info("Adding %s to whitelist", user) self.tts_allowed.append(user) if user in self.tts_denied: logging.info("Removing %s from deny list", user) self.tts_denied.remove(user) return True if msg.startswith('#') and self.quickvote is True: logging.info('Quickvote: Cast detected') self.pollcount += 1 self.poll[user] = msg.lower() logging.debug(self.poll) if msg.startswith('!toff'): logging.info('TTS is now turned off') msg_queue.clear() msg_queue_raw.clear() self.tts_status = False self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['TOFF']) return True if msg.startswith('!ton'): logging.info('TTS is now turned on') msg_queue.clear() msg_queue_raw.clear() self.tts_status = True self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['TON']) return True if msg.startswith('!tts'): logging.debug('!tts command detected') if msglen > conf['IRC_TTS_LEN']: logging.info('TTS message is to long') self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['TOO_LONG']) return False logging.debug("tts status: %s", self.tts_status) logging.debug(conf['TTS_STARTENABLED']) if not self.tts_status: logging.info('TTS is disabled') self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['DISABLED']) return False if user in self.tts_denied: logging.info("%s is not allowed to use TTS", user) self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['DENIED']) return False if conf['IRC_SUBONLY']: if subscriber != "0" or 'moderator' in badges or 'broadcaster' in badges: logging.debug('TTS is sub-only and user has allowance') else: logging.info('TTS is sub-only') self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['SUBONLY']) return False if conf['IRC_MODONLY']: if 'moderator' in badges or 'broadcaster' in badges: logging.debug('TTS is mod-only and user has allowance') else: logging.info('TTS is sub-only') self.sendmsg(conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['MODONLY']) return False if conf['WHITELIST']: if user not in self.tts_allowed: logging.info('User is not on whitelist') logging.info(self.tts_allowed) self.sendmsg( conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['WHITELISTONLY'] ) return False else: logging.info('Nobody is on the whitelist.') self.sendmsg( conf['IRC_CHANNEL'], "@"+str(user), conf['MESSAGE']['WHITELISTONLY'] ) return False logging.info('Valid TTS message, adding to raw queue') tts = True now = datetime.datetime.now() msg = msg.replace('!tts','',1) msg = { "TTS": tts, "msg": msg, "badges": badges, "subscriber": subscriber, "msgid": msgid, "user": user, "length": msglen, "queuetime": now, "timestamp": str(time.time_ns()) } msg_queue_raw.append(msg) return True return False class HTTPserv(BaseHTTPRequestHandler): """Simple HTTP Server""" def log_message(self, format, *args): # pylint: disable=redefined-builtin """Suppress HTTP log messages""" return def do_GET(self): # pylint: disable=invalid-name """Process GET requests""" if self.path == '/': self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() with open("tts.html", "rb") as file: html = file.read() self.wfile.write(html) elif self.path == '/favicon.ico': self.send_response(200) self.send_header('Content-type', 'image/x-icon') self.end_headers() with open("favicon.ico", "rb") as file: icon = file.read() self.wfile.write(icon) elif self.path == '/tts.js': self.send_response(200) self.send_header('Content-type', 'text/javascript') self.end_headers() with open("tts.js", "rb") as file: html = file.read() self.wfile.write(html) elif self.path == '/jquery.js': self.send_response(200) self.send_header('Content-type', 'text/javascript') self.end_headers() with open("jquery.js", "rb") as file: html = file.read() self.wfile.write(html) elif self.path == '/bootstrap.min.css': self.send_response(200) self.send_header('Content-type', 'text/css') self.end_headers() with open("bootstrap.min.css", "rb") as file: html = file.read() self.wfile.write(html) elif self.path.startswith('/tts_queue'): tts_json = "" tts = {} self.send_response(200) self.send_header('Content-type', 'text/json') self.end_headers() usermap = conf['USERMAP'] sorted_tts = {k: msg_queue[k] for k in sorted(msg_queue, reverse=False)} for key in list(sorted_tts.keys()): if key not in tts_done: if msg_queue[key][0].lower() in usermap: tts = {str(key): str(usermap[msg_queue[key][0].lower()]) + " " + str(conf['MESSAGE']['SAYS']) + ":" + str(msg_queue[key][1])} else: tts = {str(key): str(msg_queue[key][0]) + " " + str(conf['MESSAGE']['SAYS']) + ":" + str(msg_queue[key][1])} tts_json = json.dumps(tts) self.wfile.write(bytes(str(tts_json)+"\n", "utf-8")) elif self.path.startswith('/tts_done'): get_params = parse_qs(self.path) if '/tts_done?id' in get_params: logging.info("Removing message from queue") tts_done.append(get_params['/tts_done?id'][0]) self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(bytes("OK\n", "utf-8")) else: self.send_response(500) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(bytes("Internal Server error\n", "utf-8")) elif self.path.startswith('/token') and conf['IRC_OAUTH_TOKEN'] == "Invalid": data = {} data['client_id'] = "ebo548vs6tq54c9zlrgin2yfzzlrrs" data['response_type'] = "token" data['scope'] = "chat:edit chat:read" if conf['HTTP_PORT'] == 80: data['redirect_uri'] = "http://localhost/token" elif conf['HTTP_PORT'] == 8080: data['redirect_uri'] = "http://localhost:8080/token" elif conf['HTTP_PORT'] == 3000: data['redirect_uri'] = "http://localhost:3000/token" else: self.send_response(500) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(bytes("You can only use this function if HTTP_PORT is 80, 8080 or 3000. Please change your port, or use https://www.21x9.org/twitch instead.\n", "utf-8")) return False try: url_values = urllib.parse.urlencode(data) url = "https://id.twitch.tv/oauth2/authorize" full_url = url + "?" + url_values data = urllib.request.urlopen(full_url) if data: self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(bytes("OAuth Token Generator
Click to start the OAuth process.
\n", "utf-8")) else: self.send_response(500) self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(bytes("Could not get OAuth-URL from Twitch\n", "utf-8")) except Exception: # pylint: disable=broad-except logging.error('Could not fetch OAuth-URL from Twitch.') else: self.send_response(404) self.send_header('Server', 'TTS') self.send_header('Content-type', 'text/plain') self.end_headers() self.wfile.write(bytes("File not found.\n", "utf-8")) return def http_serve_forever(httpd): """httpd loop""" httpd.serve_forever() def load_config(): """Loading config variables""" logging.info("Loading configfile") try: with open("config.yml", "r", encoding="UTF-8") as ymlfile: cfg = yaml.load(ymlfile, Loader=yaml.Loader) except FileNotFoundError: logging.fatal('Your config file is missing, please copy config-dist.yml to config.yml and review your settings.') sys.exit(253) for section in cfg: try: logging.debug('Fetching config: %s', section) conf['IRC_CHANNEL'] = cfg.get('irc', {}).get('channel', False) conf['IRC_USERNAME'] = cfg.get('irc', {}).get('username', False) conf['IRC_OAUTH_TOKEN'] = cfg.get('irc', {}).get('oauth_token', False) conf['IRC_SERVER'] = cfg.get('irc', {}).get('server', "irc.chat.twitch.tv") conf['IRC_CLEARMSG_TIMEOUT'] = cfg.get('irc', {}).get('clearmsg_timeout', 60) conf['IRC_SUBONLY'] = cfg.get('bot', {}).get('subonly', False) conf['IRC_MODONLY'] = cfg.get('bot', {}).get('modonly', False) conf['IRC_TTS_LEN'] = cfg.get('bot', {}).get('message_length', 200) conf['TTS_STARTENABLED'] = cfg.get('bot', {}).get('start_enabled', False) conf['LOG_LEVEL'] = cfg.get('log', {}).get('level', "INFO") conf['HTTP_PORT'] = cfg.get('http', {}).get('port', 80) conf['HTTP_BIND'] = cfg.get('http', {}).get('bind', "localhost") conf['MESSAGE'] = {} conf['MESSAGE']['TOFF'] = cfg.get('messages', {}).get('toff', "TTS is now disabled.") conf['MESSAGE']['TON'] = cfg.get('messages', {}).get('ton', "TTS is now active.") conf['MESSAGE']['TOO_LONG'] = cfg.get('messages', {}).get('too_long', "Sorry, your message is too long.") conf['MESSAGE']['DISABLED'] = cfg.get('messages', {}).get('disabled', "Sorry, TTS is disabled.") conf['MESSAGE']['DENIED'] = cfg.get('messages', {}).get('denied', "Sorry, you're not allowed to use TTS.") conf['MESSAGE']['SUBONLY'] = cfg.get('messages', {}).get('subonly', "Sorry, TTS is sub-only.") conf['MESSAGE']['MODONLY'] = cfg.get('messages', {}).get('modonly', "Sorry, TTS is mod-only.") conf['MESSAGE']['READY'] = cfg.get('messages', {}).get('ready', "TTS bot is ready.") conf['MESSAGE']['WHITELISTONLY'] = cfg.get('messages', {}).get('whitelist', False) conf['MESSAGE']['SAYS'] = cfg.get('messages', {}).get('says', "says") conf['MESSAGE']['VOTESTART'] = cfg.get('messages', {}).get('votestart', "Quickvote started. Send #yourchoice to participate.") conf['MESSAGE']['VOTEEND'] = cfg.get('messages', {}).get('voteend', "Quickvote ended. The results are:") conf['MESSAGE']['VOTENOBODY'] = cfg.get('messages', {}).get('votenobody', "Nobody casted a vote. :(") conf['MESSAGE']['VOTERESULT'] = cfg.get('messages', {}).get('voteresult', "Voting has ended. The result is:") conf['MESSAGE']['VOTES'] = cfg.get('messages', {}).get('votes', "Stimmen") conf['USERMAP'] = cfg.get('usermapping', []) if 'whitelist' in cfg: conf['WHITELIST'] = True conf['WHITELIST_USER'] = cfg['whitelist'] else: conf['WHITELIST'] = False except KeyError: logging.exception('Your config file is invalid, please check and try again.') sys.exit(254) if conf['WHITELIST']: logging.info('Whitelist mode enabled') logging.debug('Whitelist:') logging.debug(conf['WHITELIST_USER']) if not conf['IRC_CHANNEL']: raise ValueError('Please add your twitch channel to config.yml.') if not conf['IRC_USERNAME']: raise ValueError('Please add the bots username to config.yml.') if not conf['IRC_OAUTH_TOKEN']: conf['IRC_OAUTH_TOKEN'] = "Invalid" return conf if not conf['IRC_OAUTH_TOKEN'].startswith('oauth:'): raise ValueError('Your oauth-token is invalid, it has to start with: "oauth:"') return conf conf = {} tts_done = [] msg_queue_raw = [] msg_queue = {} logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(module)s %(threadName)s %(levelname)s: %(message)s') sys.tracebacklimit = 0 if sys.argv[1:]: if sys.argv[1] == "--version": print('Simple TTS Bot') print('Version 1.1.0') sys.exit(1) def main(): """Main loop""" global conf # pylint: disable=global-statement,invalid-name conf = load_config() lastreload = datetime.datetime.now() logging.getLogger().setLevel(conf['LOG_LEVEL']) if conf['LOG_LEVEL'] == 'DEBUG': sys.tracebacklimit = 5 logging.info("Starting Webserver") httpd = socketserver.TCPServer((conf['HTTP_BIND'], conf['HTTP_PORT']), HTTPserv) httpd.allow_reuse_port = True httpd.allow_reuse_address = True http_thread = Thread(target=http_serve_forever, daemon=True, args=(httpd, )) http_thread.start() if conf['IRC_OAUTH_TOKEN'] == "Invalid": logging.error('No OAuth Token, skipping start of IRC bot.') logging.error('Please open http://%s:%s/token to generate your OAuth-Token.', conf['HTTP_BIND'], conf['HTTP_PORT']) url = 'http://'+str(conf['HTTP_BIND'])+':'+str(conf['HTTP_PORT'])+'/token' webbrowser.open_new_tab(url) logging.info('Please complete the OAuth process within the next 15 minutes.') time.sleep(900) sys.exit(250) else: logging.info("Starting IRC bot") irc = IRC() irc.connect(conf['IRC_SERVER'], 6667, conf['IRC_CHANNEL'], conf['IRC_USERNAME'], conf['IRC_OAUTH_TOKEN']) irc.sendmsg(conf['IRC_CHANNEL'], 'MrDestructoid', conf['MESSAGE']['READY']) logging.info("Please open your browser and visit: http://%s:%s/", conf['HTTP_BIND'], conf['HTTP_PORT']) url = 'http://'+str(conf['HTTP_BIND'])+':'+str(conf['HTTP_PORT']) webbrowser.open_new_tab(url) while True: if conf['LOG_LEVEL'] == "DEBUG": time.sleep(1) try: irc.get_response() if not irc.tts_status: logging.debug("TTS is disabled") if conf['LOG_LEVEL'] == "DEBUG": time.sleep(1) continue confreload = datetime.datetime.now() if confreload - lastreload > datetime.timedelta(seconds=60): conf = load_config() lastreload = datetime.datetime.now() if irc.quickvote and irc.votemsg: logging.info('Quickvote is active') irc.sendmsg(conf['IRC_CHANNEL'], "@chat", conf['MESSAGE']['VOTESTART'] + " (" + str(irc.votemsg) + ")") logging.debug('Raw message queue:') logging.debug(msg_queue_raw) for raw_msg in msg_queue_raw: logging.debug('Raw msg:') logging.debug(msg_queue_raw) now = datetime.datetime.now() if now - raw_msg['queuetime'] > datetime.timedelta(seconds=conf['IRC_CLEARMSG_TIMEOUT']): logging.debug('clearmsg_timeout reached') if raw_msg['timestamp'] not in msg_queue: logging.info('Sending TTS message') msg_queue[raw_msg['timestamp']] = [raw_msg['user'], raw_msg['msg']] logging.debug(msg_queue) else: logging.debug('Msg is already in queue') except KeyboardInterrupt: logging.info('Exiting...') sys.exit() if __name__ == "__main__": main()