ubuntu-bots/Encyclopedia/plugin.py

982 lines
40 KiB
Python

# -*- Encoding: utf-8 -*-
###
# Copyright (c) 2006-2007 Dennis Kaarsemaker
# Copyright (c) 2008-2010 Terence Simpson
# Copyright (c) 2018- Krytarik Raido
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of version 2 of the GNU General Public License as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
###
from supybot.commands import *
import supybot.ircmsgs as ircmsgs
import supybot.callbacks as callbacks
import supybot.registry as registry
import supybot.ircdb as ircdb
import supybot.conf as conf
import supybot.utils as utils
import supybot.ircutils as ircutils
import supybot.world as world
import sys, os, re, hashlib, random
import sqlite3, datetime, time
_stripNickChars = """!"#$%&'()*+,./:;<=>?@~"""
def stripNick(nick):
while nick and nick[-1] in _stripNickChars:
nick = nick[:-1]
return nick
def defaultIgnored(hostmask):
if not conf.supybot.defaultIgnore():
return False
try:
user = ircdb.users.getUser(hostmask)
except KeyError:
return True
return False
def checkIgnored(hostmask, channel):
try:
user = ircdb.users.getUser(hostmask)
try:
if user._checkCapability('trusted'):
return False
except KeyError:
pass
if user.ignore:
return True
except KeyError:
pass
if ircdb.ignores.checkIgnored(hostmask):
return True
if channel:
c = ircdb.channels.getChannel(channel)
if c.checkIgnored(hostmask):
return True
return False
def checkAddressed(text, channel):
if channel:
if text[0] in str(conf.supybot.reply.whenAddressedBy.chars.get(channel)):
return True
elif text[0] in conf.supybot.reply.whenAddressedBy.chars():
return True
return False
# Simple wrapper class for factoids
class Factoid:
def __init__(self, name, value, author=None, added=None,
editor=None, edited=None, popularity=None):
self.name = name; self.value = value
self.author = author; self.added = added
self.editor = editor; self.edited = edited
self.popularity = popularity
class FactoidSet:
def __init__(self):
self.global_primary = self.global_secondary = \
self.channel_primary = self.channel_secondary = None
# Repeat filtering message queue
msgcache = {}
def queue(irc, target, msg):
if world.testing:
# don't mess up testcases
irc.reply(msg, to=target, private=True)
return
now = time.time()
for m in list(msgcache.keys()):
if msgcache[m] < now - 30:
msgcache.pop(m)
for m in msgcache:
if m[0] == irc and m[1] == target:
oldmsg = m[2]
if oldmsg.endswith(msg):
break
if msg.endswith(oldmsg) and msg[:-len(oldmsg)].endswith(': '):
msg = msg[:-len(oldmsg)] + 'Please see above'
else:
msgcache[(irc, target, msg)] = now
irc.reply(msg, to=target, private=True)
def capab(prefix, capability):
# too bad people don't use supybot's own methods,
# it would save me the trouble of hacking this up
if world.testing:
# we're running a testcase, return always True
return True
# Capability hierarchy #
if capability == "editfactoids":
if capab(prefix, "addeditors"):
return True
elif capability == "addeditors":
if capab(prefix, "admin"):
return True
elif capability == "admin":
if capab(prefix, "owner"):
return True
# Check capability #
try:
user = ircdb.users.getUser(prefix)
except KeyError:
return False
if capability in list(user.capabilities):
return True
return False
def get_factoid_label(n, v):
if v.startswith('<alias>'):
n += '@' + v[7:].strip()
elif v.startswith('<deleted>'):
n += '*'
return n
# This regexp should match most URLs in the format protocol://(domain|ip adress)
# and the special case when there's no protocol but domain starts with www.
# We do this so we can filter obvious requests with spam in them
octet = r'(?:2(?:[0-4]\d|5[0-5])|1\d\d|\d{1,2})' # 0 - 255
ip_address = r'%s(?:\.%s){3}' % (octet, octet) # 0.0.0.0 - 255.255.255.255
# Base domain regex off RFC 1034 and 1738
label = r'[0-9a-z][-0-9a-z]*[0-9a-z]?'
domain = r'%s(?:\.%s)*\.[a-z][-0-9a-z]*[a-z]?' % (label, label) # like www.ubuntu.com
# complete regexp
urlRe = re.compile(r'(?:\w+://(?:%s|%s)|www\.%s)' % (domain, ip_address, domain), re.I)
def checkUrl(s):
"""Check if string contains something like a URL"""
return bool(urlRe.search(s))
class DbWrapper(object):
def __init__(self, db, name):
self.db = db
self.name = name
self.time = time.time()
class Encyclopedia(callbacks.Plugin):
"""!factoid: show factoid"""
def __init__(self, irc):
self.__parent = super(Encyclopedia, self)
self.__parent.__init__(irc)
self.databases = {}
self.times = {}
self.edits = {}
self.alert = False
self.defaultIrc = irc
class editor(callbacks.Commands):
def add(self, irc, msg, args, name):
"""<name>
Adds the user with the name <name> to the list of editors.
"""
if not capab(msg.prefix, 'addeditors'):
irc.errorNoCapability('addeditors')
return
try:
user = ircdb.users.getUser(name)
user.addCapability('editfactoids')
irc.replySuccess()
except:
irc.error('User %s is not registered' % name)
add = wrap(add, ['otherUser'])
def remove(self, irc, msg, args, name):
"""<name>
Removes the user with the name <name> from the list of editors.
"""
if not capab(msg.prefix, 'addeditors'):
irc.errorNoCapability('addeditors')
return
try:
user = ircdb.users.getUser(name)
user.removeCapability('editfactoids')
irc.replySuccess()
except:
irc.error('User %s is not registered or not an editor' % name)
remove = wrap(remove, ['otherUser'])
def editors(self, irc, msg, args):
"""Takes no arguments
Lists all the users who are in the list of editors.
"""
irc.reply(', '.join([u.name for u in list(ircdb.users.users.values()) if capab(u.name, 'editfactoids')]), private=True)
editors = wrap(editors)
def moderators(self, irc, msg, args):
"""Takes no arguments
Lists all the users who can add users to the list of editors.
"""
irc.reply(', '.join([u.name for u in list(ircdb.users.users.values()) if capab(u.name, 'addeditors')]), private=True)
moderators = wrap(moderators)
def get_target(self, irc, nick, text, target):
ret, retmsg = text, ''
orig_target = target
# Test for factoid creation/edit before continuing
if re.match(r'\S+\s+is(\s+|\s*<(reply|alias|sed)>\s*)\S+', text, re.I):
return ret, target, retmsg
if '|' in text:
retmsg = text[text.find('|')+1:].lstrip('|').strip()
ret = text[:text.find('|')].strip()
if retmsg:
if retmsg.lower() == "me":
retmsg = nick
retmsg = "%s: " % retmsg
elif '>' in text:
target = text[text.find('>')+1:].lstrip('>').strip()
ret = text[:text.find('>')].strip()
if target:
# Take the first "nick" and strip off bad chars
target = stripNick(target.split()[0])
if target.lower() == "me":
target = nick
retmsg = "<%s> wants you to know: " % nick
else:
match = re.match(r'^tell\s+(?P<target>\S+)\s+about\s+(?P<ret>.+)$', text, re.I)
if match:
target = match.group('target')
ret = match.group('ret')
if target.lower() == "me":
target = nick
retmsg = "<%s> wants you to know: " % nick
if target.lower() != orig_target.lower() and irc.isChannel(target):
target = orig_target
retmsg = "(Forwarding to channels is not permitted) "
elif nick.lower() in (target.lower(), retmsg[:-2].lower()) \
and nick.lower() != orig_target.lower():
target = nick
retmsg = "(In the future, please use a private message to investigate) "
return ret, target, retmsg
def get_db(self, channel):
db = self.registryValue('database', channel)
if channel in self.databases \
and (self.databases[channel].time < time.time() - 3600 \
or self.databases[channel].name != db):
self.databases[channel].db.close()
self.databases.pop(channel)
if channel not in self.databases:
self.databases[channel] = DbWrapper(
sqlite3.connect(os.path.join(self.registryValue('datadir'), '%s.db' % db)), db)
return self.databases[channel].db
def get_factoids(self, name, channel, display=None):
factoids = FactoidSet()
if not display or display == 'alias':
deleted = False
else:
deleted = True
factoids.global_primary = self.get_single_factoid(channel, name, deleted)
factoids.global_secondary = self.get_single_factoid(channel, name + '-also', deleted)
if channel and not display:
factoids.channel_primary = self.get_single_factoid(channel, name + '-' + channel.lower(), deleted)
factoids.channel_secondary = self.get_single_factoid(channel, name + '-' + channel.lower() + '-also', deleted)
if not display:
self.increment_factoid_popularity(factoids, channel)
factoids.global_primary = self.resolve_alias(channel, factoids.global_primary)
factoids.global_secondary = self.resolve_alias(channel, factoids.global_secondary)
if channel:
factoids.channel_primary = self.resolve_alias(channel, factoids.channel_primary)
factoids.channel_secondary = self.resolve_alias(channel, factoids.channel_secondary)
elif display == 'info':
factoids.global_primary = self.factoid_info(channel, factoids.global_primary)
factoids.global_secondary = self.factoid_info(channel, factoids.global_secondary)
if channel:
factoids.channel_primary = self.factoid_info(channel, factoids.channel_primary)
factoids.channel_secondary = self.factoid_info(channel, factoids.channel_secondary)
return factoids
def increment_factoid_popularity(self, factoids, channel):
for order in ('primary', 'secondary'):
for loc in ('channel', 'global'):
key = '%s_%s' % (loc, order)
if getattr(factoids, key):
factoid = getattr(factoids, key)
if isinstance(factoid, Factoid):
db = self.get_db(channel)
cur = db.cursor()
cur.execute("UPDATE facts SET popularity = ? WHERE name = ?", (factoid.popularity+1, factoid.name))
db.commit()
break
def get_single_factoid(self, channel, name, deleted=False):
db = self.get_db(channel)
cur = db.cursor()
if deleted:
cur.execute("SELECT name, value, author, added, editor, edited, popularity FROM facts WHERE name = ?", (name.lower(),))
else:
cur.execute("SELECT name, value, author, added, editor, edited, popularity FROM facts WHERE name = ? AND value NOT LIKE '<deleted>%%'", (name.lower(),))
factoids = cur.fetchall()
if factoids:
return Factoid(*factoids[0])
def resolve_alias(self, channel, factoid, loop=0):
if not factoid:
return
if loop >= 10:
return "Error: Infinite <alias> loop detected"
if factoid.name in self.registryValue('alert', channel):
self.alert = True
if factoid.value.startswith('<alias>'):
alias_name = factoid.value[7:].strip()
factoids = self.get_factoids(alias_name, channel, display='alias')
for x in ('channel_primary', 'global_primary'):
if getattr(factoids, x):
return self.resolve_alias(channel, getattr(factoids, x), loop+1)
return "Error: Unresolvable <alias> to '%s'" % alias_name
else:
return factoid
def factoid_info(self, channel, factoid):
if not factoid:
return
ret = [get_factoid_label(factoid.name, factoid.value)]
db = self.get_db(channel)
cur = db.cursor()
# Try and find aliases
cur.execute("SELECT name FROM facts WHERE value LIKE ?", ('<alias>_%s' % factoid.name,))
data = cur.fetchall()
if data:
ret.append("aliases: %s" % ', '.join([x[0] for x in data]))
else:
ret.append("no aliases")
# Author info
ret.append("added by %s on %s" % (factoid.author[:factoid.author.find('!')], factoid.added[:factoid.added.rfind('.')]))
if factoid.editor:
ret.append("last edited by %s on %s" % (factoid.editor[:factoid.editor.find('!')], factoid.edited[:factoid.edited.rfind('.')]))
# Popularity info
ret.append("requested %d times" % factoid.popularity)
return ' - '.join(ret)
def check_aliases(self, channel, factoid):
now = time.time()
for e in list(self.edits.keys()):
if self.edits[e] < now - 10:
self.edits.pop(e)
if not factoid.value.startswith('<alias>'):
return
# Was the old value an alias?
oldf = self.get_single_factoid(channel, factoid.name)
if oldf and oldf.value.startswith('<alias>'):
if factoid.name not in self.edits:
self.edits[factoid.name] = now
return "You are editing an alias. Please repeat the edit command within the next 10 seconds to confirm"
# Do some alias resolving
if factoid.value.startswith('<alias>'):
alias_name = factoid.value[7:].strip()
alias = self.get_single_factoid(channel, alias_name)
if not alias:
return "Factoid '%s' does not exist" % alias_name
alias = self.resolve_alias(channel, factoid)
if not isinstance(alias, Factoid):
return alias
def inFilter(self, irc, msg):
if not (msg.prefix and msg.args):
return msg
if not ircutils.isUserHostmask(msg.prefix):
return msg
if not defaultIgnored(msg.prefix):
return msg
if checkIgnored(msg.prefix, msg.channel):
return msg
if msg.command == "PRIVMSG":
self.doPrivmsg(irc, msg)
return msg
def doPrivmsg(self, irc, msg):
queue_msg = True # Queue message or send directly
def noqueue(irc, target, msg):
irc.reply(msg, to=target, private=True)
def myqueue(irc, target, msg):
(queue if queue_msg else noqueue)(irc, target, msg)
# Filter CTCP
if chr(1) in msg.args[1]:
return
if sys.version_info < (3,0):
text = msg.args[1].decode('utf-8').strip()
else:
text = msg.args[1].strip()
if not text:
return
target = msg.args[0]
channel = msg.channel
if checkAddressed(text, channel):
return
if not self.registryValue('enabled', channel):
return
if not channel:
cmd = text.split()[0].lower()
for c in irc.callbacks:
if c.isCommandMethod(cmd):
return
prefixchar = self.registryValue('prefixchar', channel)
if text[0] == prefixchar:
text = text[1:].strip()
elif re.match(r'^%s[\W\s]\s*%s' % (re.escape(irc.nick), re.escape(prefixchar)), text, re.I):
text = re.sub(r'^%s[\W\s]\s*%s\s*' % (re.escape(irc.nick), re.escape(prefixchar)), '', text, flags=re.I)
elif channel:
return
if not text:
return
if text.split()[0].lower() in self.registryValue('ignores', channel):
return
doChanMsg = True # Send message to channel
display = None # Set display type
edit = None
if not channel:
target = msg.nick
# Get display type
while text and text[0] in '-+':
if text[0] == '-':
display = 'info'
elif text[0] == '+':
display = 'raw'
text = text[1:].strip()
if not text:
return
irc = callbacks.ReplyIrcProxy(irc, msg)
# Now switch between actions
ret, retmsg = '', ''
term = self.get_target(irc, msg.nick, text, target)
lower_term = term[0].lower()
if re.match(r"please\s+see\s+((%s|(the\s+)?bot)(\s*'?s)?\s+\S+|\S+.*\s+\S+\s+(%s|(the\s+)?bot)\b)" % (re.escape(irc.nick), re.escape(irc.nick)), text, re.I):
doChanMsg = False
if lower_term == "search": # Usage info for the !search command
ret = "Search factoids for term: !search <term>"
target = term[1]
retmsg = term[2]
elif re.match(r"^seen\b", lower_term): # Some people expect a '!seen <nick>' command
ret = "I have no seen command"
retmsg = "%s: " % msg.nick if term[2] else '' # Redirect back at the caller, rather than the target
elif lower_term.startswith("google "): # Some people expect a '!google <term...>' command
ret = "I have no google command, use https://www.google.com/"
retmsg = "%s: " % msg.nick if term[2] else '' # Redirect back at the caller, rather than the target
elif re.match(r"^what'?s?\b", lower_term): # Try and catch people saying "ubottu: what is ...?"
ret = "I am only a bot, please don't think I'm intelligent :)"
retmsg = "%s: " % msg.nick if channel else ''
# Lookup, search or edit?
elif lower_term.startswith('search '):
ret = self.search_factoid(text[7:].strip(), channel)
elif lower_term.startswith('delete '):
edit = 'delete'
elif re.match(r'^no[\s,]+[^|\s]+[^|]*?\s+is(\s+|\b)\S+', text, re.I) \
or re.match(r'^[^|\s]+[^|]*?\s*(\s+is\s*<sed>|=~|~=)\s*\S+', text, re.I) \
or lower_term.startswith(('forget ', 'unforget ')):
edit = 'edit'
elif re.match(r'^[^|\s]+[^|]*?\s+is(\s+|\b)\S+', text, re.I):
edit = 'add'
else:
if not display:
text, target, retmsg = term
ret = self.factoid_lookup(text, channel, msg.nick, display)
if edit:
check = getattr(self, 'factoid_%s_check' % edit)(text, channel, msg.nick)
if not isinstance(check, tuple):
ret = check
elif not (capab(msg.prefix, 'editfactoids') \
or (channel in self.registryValue('editchannel') \
and capab(msg.prefix, 'restricted-editor'))):
if len(check[1].name.split()) > 4:
ret = "I am only a bot, please don't think I'm intelligent :)"
retmsg = "%s: " % msg.nick if channel else ''
else:
relaychan = self.registryValue('relaychannel', channel)
if channel and channel == relaychan:
irc.reply("%s: Your edit request will be attended to as soon as possible. " % msg.nick
+ "Thank you for your attention to detail", to=channel)
else:
irc.reply("Your edit request has been forwarded to %s. " % relaychan
+ "Thank you for your attention to detail", private=True)
irc.reply("In %s, %s said: %s" % (msg.args[0], msg.nick, msg.args[1]), to=relaychan)
self.log_request(check[0], check[1], channel, msg.prefix)
return
else:
queue_msg = False
getattr(self, 'factoid_%s' % edit)(check[1], channel, msg.prefix)
ret = check[2]
if not ret:
if len(text.split()) > 4:
ret = "I am only a bot, please don't think I'm intelligent :)"
retmsg = "%s: " % msg.nick if channel else ''
else:
ret = self.registryValue('notfoundmsg')
if ret.count('%') == ret.count('%s') == 1:
if sys.version_info < (3,0):
ret = ret % repr(text).lstrip('u')
else:
ret = ret % repr(text)
if not channel or self.registryValue('privateNotFound', channel):
myqueue(irc, msg.nick, ret)
else:
myqueue(irc, channel, ret)
return
if not isinstance(ret, list):
myqueue(irc, target, retmsg + ret)
else:
if self.alert:
if channel and not channel.endswith('bots'):
irc.reply('%s called the ops in %s (%s)' % (msg.nick, msg.args[0], retmsg),
to=self.registryValue('relayChannel', channel))
self.alert = False
if retmsg and checkUrl(retmsg):
# !ops factoid called with a URL, most likely spam
return
if doChanMsg and channel and not irc.isChannel(target) \
and target in irc.state.channels[channel].users:
myqueue(irc, channel, "%s: Please see my private message" % target)
myqueue(irc, target, retmsg + ret[0])
for r in ret[1:]:
myqueue(irc, target, r)
def doPart(self, irc, msg):
if len(msg.args) < 2 or not msg.args[1].startswith('requested by'):
return
channel, reason = msg.args
reason = reason[reason.find('(')+1:-1] # get the text between ()
self._forcedFactoid(irc, channel, msg.nick, reason)
def doKick(self, irc, msg):
channel, nick, reason = msg.args
self._forcedFactoid(irc, channel, nick, reason)
def _forcedFactoid(self, irc, channel, nick, reason):
if not self.registryValue('forcedFactoid', channel):
return
factoidRe = re.compile(r'%s\w+\b' % re.escape(self.registryValue('prefixchar', channel)))
factoids = factoidRe.findall(reason)
if not factoids:
return
L = []
for fact in factoids:
L.extend(self.factoid_lookup(fact[1:], channel, nick))
for s in L:
irc.reply(s, to=nick, private=True)
def factoid_delete(self, factoid, channel, editor):
edited = str(datetime.datetime.utcnow())
self.log_edit('delete', factoid, channel, editor, edited)
db = self.get_db(channel)
cs = db.cursor()
cs.execute("DELETE FROM facts WHERE name = ?", (factoid.name,))
db.commit()
def factoid_delete_check(self, text, channel, editor):
if not text.lower().startswith('delete '):
return
name = text[7:].strip()
factoid = self.get_single_factoid(channel, name, deleted=True)
if not factoid:
return "I know nothing about '%s' yet, %s" % (name, editor)
retmsg = "I'll delete that, %s" % editor
ret = self.check_aliases(channel, factoid)
if ret:
return ret
return 'delete', factoid, retmsg
def factoid_edit(self, factoid, channel, editor):
edited = str(datetime.datetime.utcnow())
self.log_edit('edit', factoid, channel, editor, edited)
db = self.get_db(channel)
cs = db.cursor()
cs.execute("UPDATE facts SET value = ?, editor = ?, edited = ? WHERE name = ?",
(factoid.value, editor, edited, factoid.name))
db.commit()
def factoid_edit_check(self, text, channel, editor):
factoid = value = retmsg = None
if text.lower()[:3] in ('no ', 'no,'):
match = re.match(r'^no[\s,]+(?P<name>\S+.*?)\s+is(?:\s+|\b)(?P<value>.*?\S+)$', text, re.I)
if not match:
return
name = match.group('name')
value = match.group('value')
factoid = self.get_single_factoid(channel, name)
if not factoid:
return "I know nothing about '%s' yet, %s" % (name, editor)
if value == factoid.value:
return "Nothing changed there"
factoid.value = value
retmsg = "I'll remember that, %s" % editor
elif text.lower().startswith('forget '):
name = text[7:].strip()
factoid = self.get_single_factoid(channel, name)
if not factoid:
return "I know nothing about '%s' yet, %s" % (name, editor)
factoid.value = '<deleted>%s' % factoid.value
retmsg = "I'll forget that, %s" % editor
elif text.lower().startswith('unforget '):
name = text[9:].strip()
factoid = self.get_single_factoid(channel, name, deleted=True)
if not factoid:
return "I know nothing about '%s' at all, %s" % (name, editor)
if not factoid.value.startswith('<deleted>'):
return "Factoid '%s' is not deleted yet, %s" % (factoid.name, editor)
factoid.value = factoid.value[9:]
retmsg = "I suddenly remember '%s' again, %s" % (factoid.name, editor)
else:
match = re.match(r'^(?P<name>\S+.*?)\s*(?:\s+is\s*<sed>|=~|~=)\s*(?P<regex>.*?\S+)$', text, re.I)
if not match:
return
# Split into name and regex
name = match.group('name')
regex = match.group('regex')
# Edit factoid
factoid = self.get_single_factoid(channel, name)
if not factoid:
return "I know nothing about '%s' yet, %s" % (name, editor)
# Grab the regex
if regex.startswith('s'):
regex = regex[1:]
if regex[-1] != regex[0]:
return "Missing end delimiter"
if regex.count(regex[0]) > 3:
return "Too many delimiters"
if regex.count(regex[0]) < 3:
return "Not enough delimiters"
regex, replace = regex[1:-1].split(regex[0])
try:
regex = re.compile(regex)
except:
return "Malformed regex"
value = regex.sub(replace, factoid.value, 1)
if value == factoid.value:
return "Nothing changed there"
factoid.value = value
retmsg = "I'll remember that, %s" % editor
ret = self.check_aliases(channel, factoid)
if ret:
return ret
return 'edit', factoid, retmsg
def factoid_add(self, factoid, channel, editor):
edited = str(datetime.datetime.utcnow())
self.log_edit('add', factoid, channel, editor, edited)
db = self.get_db(channel)
cs = db.cursor()
cs.execute("INSERT INTO facts (name, value, author, added) VALUES (?, ?, ?, ?)",
(factoid.name, factoid.value, editor, edited))
db.commit()
def factoid_add_check(self, text, channel, editor):
match = re.match(r'^(?P<name>\S+.*?)\s+is(?:\s+|\b)(?P<value>.*?\S+)$', text, re.I)
if not match:
return
name = match.group('name').lower()
if re.match(r'.*<(reply|alias|sed)>.*', name):
return "You likely don't want this, %s" % editor
value = match.group('value')
if value.startswith(('also ', 'also:')):
name += '-also'
value = value[5:].strip()
if not value:
return
if self.get_single_factoid(channel, name, deleted=True):
return "But '%s' already means something else!" % name
factoid = Factoid(name, value)
retmsg = "I'll remember that, %s" % editor
ret = self.check_aliases(channel, factoid)
if ret:
return ret
return 'add', factoid, retmsg
def factoid_lookup(self, text, channel, nick, display=None):
def expand(value):
for k in sorted(list(expandos.keys()), reverse=True):
value = value.replace(k, expandos[k])
return value
expandos = {
'$who': nick, '$nick': self.defaultIrc.nick, '$chan': channel or self.defaultIrc.nick,
'$curStable': self.registryValue('curStable'), '$curStableLower': self.registryValue('curStable').lower(),
'$curStableLong': self.registryValue('curStableLong'), '$curStableNum': self.registryValue('curStableNum'),
'$curLTS': self.registryValue('curLTS'), '$curLTSLower': self.registryValue('curLTS').lower(),
'$curLTSLong': self.registryValue('curLTSLong'), '$curLTSNum': self.registryValue('curLTSNum'),
'$curDevel': self.registryValue('curDevel'), '$curDevelLower': self.registryValue('curDevel').lower(),
'$curDevelLong': self.registryValue('curDevelLong'), '$curDevelNum': self.registryValue('curDevelNum')
}
factoids = self.get_factoids(text, channel, display=display)
ret = []
for order in ('primary', 'secondary'):
for loc in ('channel', 'global'):
key = '%s_%s' % (loc, order)
if getattr(factoids, key):
factoid = getattr(factoids, key)
if not isinstance(factoid, Factoid):
ret.append(factoid)
elif display == 'raw':
ret.append(factoid.value)
elif factoid.value.startswith('<reply>'):
ret.append(expand(factoid.value[7:].strip()))
elif order == 'secondary':
ret.append(expand(factoid.value))
else:
name = factoid.name
if '-#' in name:
name = name[:name.rfind('-#')]
ret.append('%s is %s' % (name, expand(factoid.value)))
if not display == 'info':
break
return ret
def log_edit(self, etype, factoid, channel, editor, edited):
db = self.get_db(channel)
cs = db.cursor()
cs.execute("INSERT INTO log (type, name, value, author, added) VALUES (?, ?, ?, ?, ?)",
(etype, factoid.name, factoid.value, editor, edited))
db.commit()
def log_request(self, rtype, factoid, channel, requester):
db = self.get_db(channel)
cur = db.cursor()
cur.execute("SELECT type, value FROM requests WHERE name = ?", (factoid.name,))
items = cur.fetchall()
for i in items:
if i[0] == rtype and i[1] == factoid.value:
return
cur.execute("INSERT INTO requests (type, name, value, requester, requested) VALUES (?, ?, ?, ?, ?)",
(rtype, factoid.name, factoid.value, requester, str(datetime.datetime.utcnow())))
db.commit()
def search_factoid(self, text, channel):
keys = text.split()[:5]
db = self.get_db(channel)
cur = db.cursor()
qterms, values, ret = '', [], []
for k in keys:
if qterms:
qterms += ' AND '
qterms += '(name LIKE ? OR value LIKE ? OR value LIKE ?)'
values.extend(['%%%s%%' % k.lower(), '%%%s%%' % k, '%%%s%%' % k.lower()])
cur.execute("SELECT name, value FROM facts WHERE %s ORDER BY popularity DESC" % qterms, values)
res = cur.fetchall()
for r in res:
ret.append(get_factoid_label(r[0], r[1]))
if not ret:
return "None found"
return 'Found: %s' % ', '.join(ret)
def sync(self, irc, msg, args, channel):
"""[<channel>]
Downloads a copy of the database from the remote server.
Set the server with the channel variable supybot.plugins.Encyclopedia.remotedb.
If <channel> is not set, it will default to the channel
the command is given in or the global value.
"""
if not capab(msg.prefix, "owner"):
irc.errorNoCapability("owner")
return
remotedb = self.registryValue('remotedb', channel)
if not remotedb:
return
def download_database(location, dpath):
"""Download the database located at location to path dpath"""
tmp_db = "%s%stmp" % (dpath, os.extsep)
data = utils.web.getUrl(location)
fd = open(tmp_db, 'w')
fd.write(data) # Download to a temporary file
fd.close()
# Do some checking to make sure we have an SQLite database
fd = open(tmp_db, 'rb')
check = fd.read(15)
if check == 'SQLite format 3': # OK, rename to dpath
os.rename(tmp_db, dpath)
try:
self.databases[channel].db.close()
except:
pass
try:
self.databases.pop(channel)
except:
pass
else: # Remove the temporary file and raise an error
os.remove(tmp_db)
raise RuntimeError("Downloaded file was not an SQLite 3 database")
db = self.registryValue('database', channel)
if not db:
if channel:
irc.error("I don't have a database set for %s" % channel)
return
irc.error("There is no global database set, use 'config supybot.plugins.Encyclopedia.database <database>' to set it")
return
if not remotedb:
if channel:
irc.error("I don't have a remote database set for %s" % channel)
return
irc.error("There is no global remote database set, use 'config supybot.plugins.Encyclopedia.remotedb <url>' to set it")
return
dbpath = os.path.join(self.registryValue('datadir'), '%s.db' % db)
# We're moving files and downloading, lots can go wrong so use lots of try blocks
try:
os.rename(dbpath, "%s.bak" % dbpath)
except OSError:
# File doesn't exist yet, so nothing to back up
pass
except Exception as e:
self.log.error("Encyclopedia: Could not rename %s to %s.bak" % (dbpath, dbpath))
self.log.error('Encyclopedia: %s' % utils.exnToString(e))
irc.error("Internal error, see log")
return
try:
# Downloading can take some time, let the user know we're doing something
irc.reply("Attemting to download database")
download_database(remotedb, dbpath)
irc.replySuccess()
except Exception as e:
self.log.error("Encyclopedia: Could not download %s to %s" % (remotedb, dbpath))
self.log.error('Encyclopedia: %s' % utils.exnToString(e))
irc.error("Internal error, see log")
os.rename("%s.bak" % dbpath, dbpath)
return
sync = wrap(sync, [optional("channel")])
def lookup(self, irc, msg, args, author, channel):
"""[<author>] [<channel>]
Looks up factoids created or edited by <author>,
<author> defaults to you.
"""
if not capab(msg.prefix, "editfactoids"):
irc.errorNoCapability("editfactoids")
return
if not author:
author = msg.prefix
if '!' in author:
author = author[:author.find('!')]
db = self.get_db(channel)
cur = db.cursor()
auth_ret, edit_ret = [], []
cur.execute("SELECT name, value FROM facts WHERE author LIKE ? ORDER BY popularity DESC", ('%s%%' % author,))
auth_res = cur.fetchall()
cur.execute("SELECT name, value FROM facts WHERE editor LIKE ? AND author NOT LIKE ? ORDER BY popularity DESC", ('%s%%' % author, '%s%%' % author))
edit_res = cur.fetchall()
for r in auth_res:
auth_ret.append(get_factoid_label(r[0], r[1]))
for r in edit_res:
edit_ret.append(get_factoid_label(r[0], r[1]))
if not auth_ret:
auth_rmsg = "Authored: None found"
else:
auth_rmsg = 'Authored: %s' % ', '.join(auth_ret)
if not edit_ret:
edit_rmsg = "Edited: None found"
else:
edit_rmsg = 'Edited: %s' % ', '.join(edit_ret)
irc.reply(auth_rmsg)
irc.reply(edit_rmsg)
lookup = wrap(lookup, [optional("otherUser"), optional("channel")])
class ignore(callbacks.Commands):
def add(self, irc, msg, args, banmask, expires, channel):
"""<hostmask|nick> [<expires>] [<channel>]
Ignores commands/requests from <hostmask> or <nick>. If <expires> is
given, the ignore will expire after that ammount of seconds.
If <channel> is given, the ignore will only apply in that channel.
"""
if not capab(msg.prefix, "editfactoids"):
irc.errorNoCapability("editfactoids")
return
if channel:
c = ircdb.channels.getChannel(channel)
c.addIgnore(banmask, expires)
ircdb.channels.setChannel(channel, c)
irc.replySuccess()
else:
ircdb.ignores.add(banmask, expires)
irc.replySuccess()
add = wrap(add, ['hostmask', optional("expiry", 0), optional("channel")])
def remove(self, irc, msg, args, banmask, channel):
"""<hostmask|nick> [<channel>]
Removes an ignore previously set by @ignore. If <channel> was
given in the original @ignore command, it must be given here.
"""
if not capab(msg.prefix, "editfactoids"):
irc.errorNoCapability("editfactoids")
return
if channel:
c = ircdb.channels.getChannel(channel)
try:
c.removeIgnore(banmask)
ircdb.channels.setChannel(channel, c)
irc.replySuccess()
except KeyError:
irc.error("That hostmask is not ignored in %s" % channel)
else:
try:
ircdb.ignores.remove(banmask)
irc.replySuccess()
except KeyError:
irc.error("That hostmask is not globally ignored")
remove = wrap(remove, ['hostmask', optional("channel")])
def ignores(self, irc, msg, args, channel):
"""[<channel>]
Lists all ignores set by @ignore. If <channel> is given,
this will only list ignores set in that channel.
"""
if not capab(msg.prefix, "editfactoids"):
irc.errorNoCapability("editfactoids")
return
if channel:
c = ircdb.channels.getChannel(channel)
if not c.ignores:
irc.reply("I'm not currently ignoring anyone in %s" % channel)
else:
L = sorted(c.ignores)
irc.reply(utils.str.commaAndify(list(map(repr, L))))
else:
if ircdb.ignores.hostmasks:
irc.reply(format('%L', (list(map(repr, ircdb.ignores.hostmasks)))))
else:
irc.reply("I'm not currently globally ignoring anyone")
ignores = wrap(ignores, [optional("channel")])
Class = Encyclopedia