implement requesting a ban review after a configured time and add some basic testcases

bot owners note that config group 'commentRequest' is now 'request'
request.ignore config now defaults to "FloodBot? FloodBotK? ChanServ"
This commit is contained in:
Elián Hanisch 2010-04-06 21:36:50 -03:00
commit 8612e2291f
3 changed files with 517 additions and 49 deletions

View File

@ -15,10 +15,12 @@
import supybot.conf as conf
import supybot.registry as registry
class ValidTypes(registry.OnlySomeStrings):
"""Invalid type, valid types are: 'removal', 'ban' or 'quiet'."""
validStrings = ('removal', 'ban', 'quiet')
class SpaceSeparatedListOfTypes(registry.SpaceSeparatedListOf):
Value = ValidTypes
@ -34,23 +36,27 @@ conf.registerGlobalValue(Bantracker, 'database',
conf.registerGlobalValue(Bantracker, 'bansite',
registry.String('', "Web site for the bantracker, without the 'bans.cgi' appended", private=True))
conf.registerChannelValue(Bantracker, 'commentRequest',
conf.registerChannelValue(Bantracker, 'request',
registry.Boolean(False,
"Enable message requests from bot"))
conf.registerChannelValue(Bantracker.commentRequest, 'type',
conf.registerChannelValue(Bantracker.request, 'type',
SpaceSeparatedListOfTypes(['removal', 'ban', 'quiet'],
"List of events for which the bot should request a comment."))
conf.registerChannelValue(Bantracker.commentRequest, 'ignore',
registry.SpaceSeparatedListOfStrings([],
"List of nicks for which the bot won't request to comment a ban/quiet/removal."\
conf.registerChannelValue(Bantracker.request, 'ignore',
registry.SpaceSeparatedListOfStrings(['FloodBot?', 'FloodBotK?', 'ChanServ'],
"List of nicks for which the bot won't request to comment or review."\
" Is case insensible and wildcards * ? are accepted."))
conf.registerChannelValue(Bantracker.commentRequest, 'forward',
conf.registerChannelValue(Bantracker.request, 'forward',
registry.SpaceSeparatedListOfStrings([],
"List of nicks for which the bot will forward the request to"\
" the channels/nicks defined in forwards.channels option."\
" Is case insensible and wildcards * ? are accepted."))
conf.registerChannelValue(Bantracker.commentRequest.forward, 'channels',
conf.registerChannelValue(Bantracker.request.forward, 'channels',
registry.SpaceSeparatedListOfStrings([],
"List of channels/nicks to forward the request if the op that set the ban/quiet"\
" is in the forward list."))
"List of channels/nicks to forward the request if the op is in the forward list."))
conf.registerGlobalValue(Bantracker.request, 'review',
registry.Float(7,
"Days after which the bot will request for review a ban. Can be an integer or decimal"
" value. Zero disables reviews."))

View File

@ -48,6 +48,7 @@ import supybot.callbacks as callbacks
import supybot.ircmsgs as ircmsgs
import supybot.conf as conf
import supybot.ircdb as ircdb
import supybot.schedule as schedule
from fnmatch import fnmatch
import sqlite
import pytz
@ -161,6 +162,59 @@ class Ban(object):
def time(self):
return datetime.datetime.fromtimestamp(self.when)
def guessBanType(mask):
if mask[0] == '%':
return 'quiet'
elif ircutils.isUserHostmask(mask) or mask.endswith('(realname)'):
return 'ban'
return 'removal'
class PersistentCache(dict):
def __init__(self, filename):
self.filename = conf.supybot.directories.data.dirize(filename)
self.time = 0
def open(self):
import csv
try:
reader = csv.reader(open(self.filename, 'rb'))
except IOError:
return
self.time = int(reader.next()[1])
for row in reader:
host, value = self.deserialize(*row)
try:
self[host].append(value)
except KeyError:
self[host] = [value]
def close(self):
import csv
try:
writer = csv.writer(open(self.filename, 'wb'))
except IOError:
return
writer.writerow(('time', str(int(self.time))))
for host, values in self.iteritems():
for v in values:
writer.writerow(self.serialize(host, v))
def deserialize(self, host, nick, command, channel, text):
if command == 'PRIVMSG':
msg = ircmsgs.privmsg(channel, text)
elif command == 'NOTICE':
msg = ircmsgs.notice(channel, text)
else:
return
return (host, (nick, msg))
def serialize(self, host, value):
nick, msg = value
command, channel, text = msg.command, msg.args[0], msg.args[1]
return (host, nick, command, channel, text)
class Bantracker(callbacks.Plugin):
"""Plugin to manage bans.
See '@list Bantracker' and '@help <command>' for commands"""
@ -174,10 +228,10 @@ class Bantracker(callbacks.Plugin):
self.lastMsgs = {}
self.lastStates = {}
self.replies = {}
self.logs = {}
self.logs = ircutils.IrcDict()
self.nicks = {}
self.hosts = {}
self.bans = {}
self.bans = ircutils.IrcDict()
self.thread_timer = threading.Timer(10.0, dequeue, args=(self,irc))
self.thread_timer.start()
@ -189,6 +243,15 @@ class Bantracker(callbacks.Plugin):
self.db = None
self.get_bans(irc)
self.get_nicks(irc)
self.pendingReviews = PersistentCache('bt.reviews.db')
self.pendingReviews.open()
# add scheduled event for check bans that need review, check every hour
try:
schedule.removeEvent(self.name())
except:
pass
schedule.addPeriodicEvent(lambda : self.reviewBans(irc), 60*60,
name=self.name())
def get_nicks(self, irc):
self.hosts.clear()
@ -260,7 +323,9 @@ class Bantracker(callbacks.Plugin):
"""Got ban"""
if msg.args[1] not in self.bans.keys():
self.bans[msg.args[1]] = []
self.bans[msg.args[1]].append(Ban(msg.args))
bans = self.bans[msg.args[1]]
bans.append(Ban(msg.args))
bans.sort(key=lambda x: x.when) # needed for self.reviewBans
def nick_to_host(self, irc=None, target='', with_nick=True, reply_now=True):
target = target.lower()
@ -295,6 +360,8 @@ class Bantracker(callbacks.Plugin):
except:
pass
queue.clear()
schedule.removeEvent(self.name())
self.pendingReviews.close()
def reset(self):
global queue
@ -340,41 +407,123 @@ class Bantracker(callbacks.Plugin):
self.db.commit()
return data
def requestComment(self, irc, channel, ban, type=None):
if not ban or not self.registryValue('commentRequest', channel):
return
# check if we should request a comment
if nickMatch(ban.who, self.registryValue('commentRequest.ignore', channel=channel)):
def requestComment(self, irc, channel, ban):
if not ban or not self.registryValue('request', channel) \
or nickMatch(ban.who, self.registryValue('request.ignore', channel)):
return
# check the type of the action taken
mask = ban.mask
if not type:
if mask[0] == '%':
type = 'quiet'
mask = mask[1:]
elif ircutils.isUserHostmask(mask) or mask.endswith('(realname)'):
type = 'ban'
else:
type = 'removal'
type = guessBanType(mask)
if type == 'quiet':
mask = mask[1:]
# check if type is enabled
if type not in self.registryValue('commentRequest.type', channel=channel):
if type not in self.registryValue('request.type', channel):
return
# send msg
prefix = conf.supybot.reply.whenAddressedBy.chars()[0] # prefix char for commands
# check to who send the request
if nickMatch(ban.who, self.registryValue('commentRequest.forward', channel=channel)):
channels = self.registryValue('commentRequest.forward.channels', channel=channel)
if channels:
s = "Please somebody comment on the %s of %s in %s done by %s, use:"\
" %scomment %s <comment>" %(type, mask, channel, ban.who, prefix, ban.id)
for chan in channels:
msg = ircmsgs.notice(chan, s)
irc.queueMsg(msg)
return
# send to op
s = "Please comment on the %s of %s in %s, use: %scomment %s <comment>" \
%(type, mask, channel, prefix, ban.id)
irc.reply(s, to=ban.who, private=True)
try:
nick = ircutils.nickFromHostmask(ban.who)
except:
nick = ban.who
if nickMatch(nick, self.registryValue('request.forward', channel)):
s = "Please somebody comment on the %s of %s in %s done by %s, use:"\
" %scomment %s <comment>" %(type, mask, channel, nick, prefix, ban.id)
self._sendForward(irc, s, channel)
else:
# send to op
s = "Please comment on the %s of %s in %s, use: %scomment %s <comment>" \
%(type, mask, channel, prefix, ban.id)
irc.reply(s, to=nick, private=True)
def reviewBans(self, irc=None):
reviewTime = int(self.registryValue('request.review') * 86400)
if not reviewTime:
# time is zero, do nothing
return
now = time.mktime(time.gmtime())
lastreview = self.pendingReviews.time
self.pendingReviews.time = now # update last time reviewed
if not lastreview:
# initialize last time reviewed timestamp
lastreview = now - reviewTime
for channel, bans in self.bans.iteritems():
if not self.registryValue('enabled', channel) \
or not self.registryValue('request', channel):
continue
for ban in bans:
if guessBanType(ban.mask) in ('quiet', 'removal'):
# skip mutes and kicks
continue
banAge = now - ban.when
reviewWindow = lastreview - ban.when
#self.log.debug('review ban: %s ban %s by %s (%s/%s/%s %s)', channel, ban.mask,
# ban.who, reviewWindow, reviewTime, banAge, reviewTime - reviewWindow)
if reviewWindow <= reviewTime < banAge:
# ban is old enough, and inside the "review window"
try:
# ban.who should be a user hostmask
nick = ircutils.nickFromHostmask(ban.who)
host = ircutils.hostFromHostmask(ban.who)
except:
if ircutils.isNick(ban.who, strictRfc=True):
# ok, op's nick, use it
nick = ban.who
host = None
else:
# probably a ban restored by IRC server in a netsplit
# XXX see if something can be done about this
continue
if nickMatch(nick, self.registryValue('request.ignore', channel)):
# in the ignore list
continue
if not ban.id:
ban.id = self.get_banId(ban.mask, channel)
if nickMatch(nick, self.registryValue('request.forward', channel)):
s = "Hi, please somebody review the ban '%s' set by %s on %s in"\
" %s, link: %s/bans.cgi?log=%s" %(ban.mask, nick, ban.ascwhen, channel,
self.registryValue('bansite'), ban.id)
self._sendForward(irc, s, channel)
else:
s = "Hi, please review the ban '%s' that you set on %s in %s, link:"\
" %s/bans.cgi?log=%s" %(ban.mask, ban.ascwhen, channel,
self.registryValue('bansite'), ban.id)
msg = ircmsgs.privmsg(nick, s)
if host in self.pendingReviews:
self.pendingReviews[host].append((nick, msg))
else:
self.pendingReviews[host] = [(nick, msg)]
elif banAge < reviewTime:
# since we made sure bans are sorted by time, the bans left are more recent
break
def _sendForward(self, irc, s, channel=None):
if not irc:
return
for chan in self.registryValue('request.forward.channels', channel=channel):
msg = ircmsgs.notice(chan, s)
irc.queueMsg(msg)
def _sendReviews(self, irc, msg):
host = ircutils.hostFromHostmask(msg.prefix)
if host in self.pendingReviews:
for nick, m in self.pendingReviews[host]:
if msg.nick != nick and not irc.isChannel(nick): # I'm a bit extra careful here
# correct nick in msg
m = ircmsgs.privmsg(msg.nick, m.args[1])
irc.queueMsg(m)
del self.pendingReviews[host]
# check if we have any reviews by nick to send
if None in self.pendingReviews:
L = self.pendingReviews[None]
for i, v in enumerate(L):
nick, m = v
if ircutils.strEqual(msg.nick, nick):
irc.queueMsg(m)
del L[i]
if not L:
del self.pendingReviews[None]
def doLog(self, irc, channel, s):
if not self.registryValue('enabled', channel):
@ -393,12 +542,16 @@ class Bantracker(callbacks.Plugin):
self.requestComment(irc, channel, ban)
return ban
def _doKickban(self, irc, channel, nick, target, kickmsg = None, use_time = None, extra_comment = None):
def _doKickban(self, irc, channel, operator, target, kickmsg = None, use_time = None, extra_comment = None):
if not self.registryValue('enabled', channel):
return
n = now()
if use_time:
n = fromTime(use_time)
try:
nick = ircutils.nickFromHostmask(operator)
except:
nick = operator
id = self.db_run("INSERT INTO bans (channel, mask, operator, time, log) values(%s, %s, %s, %s, %s)",
(channel, target, nick, n, '\n'.join(self.logs[channel])), expect_id=True)
if kickmsg and id and not (kickmsg == nick):
@ -407,7 +560,7 @@ class Bantracker(callbacks.Plugin):
self.db_run("INSERT INTO comments (ban_id, who, comment, time) values(%s,%s,%s,%s)", (id, nick, extra_comment, n))
if channel not in self.bans:
self.bans[channel] = []
ban = Ban(mask=target, who=nick, when=time.mktime(time.gmtime()), id=id)
ban = Ban(mask=target, who=operator, when=time.mktime(time.gmtime()), id=id)
self.bans[channel].append(ban)
return ban
@ -437,6 +590,7 @@ class Bantracker(callbacks.Plugin):
'* %s %s\n' % (nick, ircmsgs.unAction(msg)))
else:
self.doLog(irc, channel, '<%s> %s\n' % (nick, text))
self._sendReviews(irc, msg)
def doNotice(self, irc, msg):
(recipients, text) = msg.args
@ -488,14 +642,14 @@ class Bantracker(callbacks.Plugin):
else:
self.doLog(irc, channel,
'*** %s was kicked by %s\n' % (target, msg.nick))
self.doKickban(irc, channel, msg.nick, target, kickmsg, extra_comment=host)
self.doKickban(irc, channel, msg.prefix, target, kickmsg, extra_comment=host)
def doPart(self, irc, msg):
for channel in msg.args[0].split(','):
self.doLog(irc, channel, '*** %s (%s) has left %s (%s)\n' % (msg.nick, msg.prefix, channel, len(msg.args) > 1 and msg.args[1] or ''))
if len(msg.args) > 1 and msg.args[1].startswith('requested by'):
args = msg.args[1].split()
self.doKickban(irc, channel, args[2], msg.nick, ' '.join(args[3:]).strip(), extra_comment=msg.prefix)
self.doKickban(irc, channel, args[2], msg.prefix, ' '.join(args[3:]).strip(), extra_comment=msg.prefix)
def doMode(self, irc, msg):
channel = msg.args[0]
@ -522,7 +676,7 @@ class Bantracker(callbacks.Plugin):
if param[0] in ('+b', '+q'):
comment = self.getHostFromBan(irc, msg, mask)
self.doKickban(irc, channel, msg.nick, mask + realname, extra_comment=comment)
self.doKickban(irc, channel, msg.prefix, mask + realname, extra_comment=comment)
elif param[0] in ('-b', '-q'):
self.doUnban(irc,channel, msg.nick, mask + realname)
@ -658,7 +812,7 @@ class Bantracker(callbacks.Plugin):
hostmask = self.nick_to_host(irc, target)
self.doLog(irc, channel.lower(), '*** %s requested a mark for %s\n' % (msg.nick, target))
self._doKickban(irc, channel.lower(), msg.nick, hostmask, kickmsg)
self._doKickban(irc, channel.lower(), msg.prefix, hostmask, kickmsg)
irc.replySuccess()
mark = wrap(mark, [optional('channel'), 'something', additional('text')])
@ -672,8 +826,10 @@ class Bantracker(callbacks.Plugin):
return mutes + bans
def get_banId(self, mask, channel):
data = self.db_run("SELECT MAX(id) FROM bans WHERE mask=%s AND channel=%s", (mask, channel), True)[0]
if not data[0]:
data = self.db_run("SELECT MAX(id) FROM bans WHERE mask=%s AND channel=%s", (mask, channel), True)
if data:
data = data[0]
if not data or not data[0]:
return
return int(data[0])
@ -953,4 +1109,23 @@ class Bantracker(callbacks.Plugin):
irc.reply("%s/bans.cgi?log=%s&mark=%s" % (self.registryValue('bansite'), id, highlight), private=True)
banlink = wrap(banlink, ['id', optional('somethingWithoutSpaces')])
def banreview(self, irc, msg, args):
"""
Lists pending ban reviews."""
if not self.check_auth(irc, msg, args):
return
count = {}
for reviews in self.pendingReviews.itervalues():
for nick, msg in reviews:
try:
count[nick] += 1
except KeyError:
count[nick] = 1
total = sum(count.itervalues())
s = ' '.join([ '%s:%s' %pair for pair in count.iteritems() ])
s = 'Pending ban reviews (%s): %s' %(total, s)
irc.reply(s)
banreview = wrap(banreview)
Class = Bantracker

View File

@ -1,4 +1,291 @@
from supybot.test import *
class BantrackerTestCase(PluginTestCase):
import supybot.conf as conf
import supybot.ircmsgs as ircmsgs
import supybot.world as world
import time
pluginConf = conf.supybot.plugins.Bantracker
pluginConf.enabled.setValue(True)
pluginConf.bansite.setValue('http://foo.bar.com')
pluginConf.database.setValue('bantracker-test.db')
def quiet(channel, hostmask, prefix='', msg=None):
"""Returns a MODE to quiet nick on channel."""
return ircmsgs.mode(channel, ('+q', hostmask), prefix, msg)
class BantrackerTestCase(ChannelPluginTestCase):
plugins = ('Bantracker',)
def setUp(self):
super(BantrackerTestCase, self).setUp()
pluginConf.request.setValue(False) # disable comments
pluginConf.request.ignore.set('')
pluginConf.request.forward.set('')
pluginConf.request.review.setValue(1.0/86400) # one second
self.setDb()
# Bantracker for some reason doesn't use Supybot's own methods for check capabilities,
# so it doesn't have a clue about testing and screws my tests by default.
# This would fix it until I bring myself to take a look
cb = self.getCallback()
f = cb.check_auth
def test_check_auth(*args, **kwargs):
if world.testing:
return True
else:
return f(*args, **kwargs)
cb.check_auth = test_check_auth
def setDb(self):
import sqlite, os
dbfile = os.path.join(os.curdir, pluginConf.database())
try:
os.remove(dbfile)
except:
pass
db = sqlite.connect(dbfile)
cursor = db.cursor()
cursor.execute('CREATE TABLE bans ('
'id INTEGER PRIMARY KEY,'
'channel VARCHAR(30) NOT NULL,'
'mask VARCHAR(100) NOT NULL,'
'operator VARCHAR(30) NOT NULL,'
'time VARCHAR(300) NOT NULL,'
'removal DATETIME,'
'removal_op VARCHAR(30),'
'log TEXT)')
cursor.execute('CREATE TABLE comments ('
'ban_id INTEGER,'
'who VARCHAR(100) NOT NULL,'
'comment MEDIUMTEXT NOT NULL,'
'time VARCHAR(300) NOT NULL)')
cursor.execute('CREATE TABLE sessions ('
'session_id VARCHAR(50) PRIMARY KEY,'
'user MEDIUMTEXT NOT NULL,'
'time INT NOT NULL)')
cursor.execute('CREATE TABLE users ('
'username VARCHAR(50) PRIMARY KEY,'
'salt VARCHAR(8),'
'password VARCHAR(50))')
db.commit()
cursor.close()
db.close()
def getCallback(self):
for cb in self.irc.callbacks:
if cb.name() == 'Bantracker':
break
return cb
def getDb(self):
return self.getCallback().db
def query(self, query, parms=()):
cursor = self.getDb().cursor()
cursor.execute(query, parms)
return cursor.fetchall()
def feedBan(self, hostmask, prefix='', channel=None, mode='b'):
if not channel:
channel = self.channel
if not prefix:
prefix = 'op!user@host.net'
if mode == 'b':
ban = ircmsgs.ban(channel, hostmask, prefix=prefix)
elif mode == 'q':
ban = quiet(channel, hostmask, prefix=prefix)
elif mode == 'k':
ban = ircmsgs.kick(channel, hostmask, s='kthxbye!', prefix=prefix)
elif mode == 'p':
ban = ircmsgs.part(channel, prefix=hostmask,
s='requested by %s (kthxbye!)' %prefix[:prefix.find('!')])
self.irc.feedMsg(ban)
return ban
def testComment(self):
pluginConf.request.setValue(True)
# test bans
self.feedBan('asd!*@*')
msg = self.irc.takeMsg()
self.assertEqual(str(msg).strip(),
"PRIVMSG op :Please comment on the ban of asd!*@* in #test, use: @comment 1"
" <comment>")
# test quiets
self.feedBan('dude!*@*', mode='q')
msg = self.irc.takeMsg()
self.assertEqual(str(msg).strip(),
"PRIVMSG op :Please comment on the quiet of dude!*@* in #test, use: @comment 2"
" <comment>")
# test kick/part
self.feedBan('dude', mode='k')
msg = self.irc.takeMsg()
self.assertEqual(str(msg).strip(),
"PRIVMSG op :Please comment on the removal of dude in #test, use: @comment 3"
" <comment>")
self.feedBan('dude', mode='p')
msg = self.irc.takeMsg()
self.assertEqual(str(msg).strip(),
"PRIVMSG op :Please comment on the removal of dude in #test, use: @comment 4"
" <comment>")
def testCommentForward(self):
pluginConf.request.setValue(True)
pluginConf.request.forward.set('bot')
pluginConf.request.forward.channels.set('#channel')
self.feedBan('qwe!*@*')
msg = self.irc.takeMsg()
self.assertEqual(str(msg).strip(),
"PRIVMSG op :Please comment on the ban of qwe!*@* in #test, use: @comment 1"
" <comment>")
self.feedBan('zxc!*@*', prefix='bot!user@host.com')
msg = self.irc.takeMsg()
self.assertEqual(str(msg).strip(),
"NOTICE #channel :Please somebody comment on the ban of zxc!*@* in #test done by bot,"
" use: @comment 2 <comment>")
def testReview(self):
pluginConf.request.setValue(True)
cb = self.getCallback()
self.feedBan('asd!*@*')
self.irc.takeMsg() # ignore comment request comment
cb.reviewBans()
self.assertFalse(cb.pendingReviews)
print 'waiting 4 secs..'
time.sleep(2)
cb.reviewBans()
# check is pending
self.assertTrue(cb.pendingReviews)
# send msg if a user with a matching host says something
self.feedMsg('Hi!', frm='op!user@fakehost.net')
msg = self.irc.takeMsg()
self.assertEqual(msg, None)
self.feedMsg('Hi!', frm='op_!user@host.net')
msg = self.irc.takeMsg()
self.assertEqual(str(msg).strip(),
"PRIVMSG op_ :Hi, please review the ban 'asd!*@*' that you set on %s in #test, link: "\
"%s/bans.cgi?log=1" %(cb.bans['#test'][0].ascwhen, pluginConf.bansite()))
# don't ask again
cb.reviewBans()
self.assertFalse(cb.pendingReviews)
# test again with two ops
self.feedBan('asd2!*@*')
self.irc.takeMsg()
self.feedBan('qwe!*@*', prefix='otherop!user@home.net')
self.irc.takeMsg()
time.sleep(2)
cb.reviewBans()
self.assertTrue(len(cb.pendingReviews) == 2)
self.feedMsg('Hi!', frm='op!user@fakehost.net')
msg = self.irc.takeMsg()
self.assertEqual(msg, None)
self.assertResponse('banreview', 'Pending ban reviews (2): otherop:1 op:1')
self.feedMsg('Hi!', frm='mynickissocreative!user@home.net')
msg = self.irc.takeMsg()
self.assertEqual(str(msg).strip(),
"PRIVMSG mynickissocreative :Hi, please review the ban 'qwe!*@*' that you set on %s in #test, link: "\
"%s/bans.cgi?log=3" %(cb.bans['#test'][2].ascwhen, pluginConf.bansite()))
self.feedMsg('ping', to='test', frm='op!user@host.net') # in a query
self.irc.takeMsg() # drop pong reply
msg = self.irc.takeMsg()
self.assertEqual(str(msg).strip(),
"PRIVMSG op :Hi, please review the ban 'asd2!*@*' that you set on %s in #test, link: "\
"%s/bans.cgi?log=2" %(cb.bans['#test'][1].ascwhen, pluginConf.bansite()))
def testReviewForward(self):
pluginConf.request.setValue(True)
pluginConf.request.forward.set('bot')
pluginConf.request.forward.channels.set('#channel')
cb = self.getCallback()
self.feedBan('asd!*@*', prefix='bot!user@host.net')
self.irc.takeMsg() # ignore comment request comment
cb.reviewBans(self.irc)
self.assertFalse(cb.pendingReviews)
print 'waiting 2 secs..'
time.sleep(2)
cb.reviewBans(self.irc)
# since it's a forward, it was sent already
self.assertFalse(cb.pendingReviews)
msg = self.irc.takeMsg()
self.assertEqual(str(msg).strip(),
"NOTICE #channel :Hi, please somebody review the ban 'asd!*@*' set by bot on %s in #test, link: "\
"%s/bans.cgi?log=1" %(cb.bans['#test'][0].ascwhen, pluginConf.bansite()))
def testReviewNickFallback(self):
"""If for some reason we don't have ops full hostmask, revert to nick match. This may be
needed in the future as hostmasks aren't stored in the db."""
pluginConf.request.setValue(True)
cb = self.getCallback()
self.feedBan('asd!*@*')
self.irc.takeMsg() # ignore comment request comment
cb.bans['#test'][0].who = 'op' # replace hostmask by nick
print 'waiting 2 secs..'
time.sleep(2)
cb.reviewBans()
# check is pending
self.assertTrue(cb.pendingReviews)
self.assertResponse('banreview', 'Pending ban reviews (1): op:1')
# send msg if a user with a matching nick says something
self.feedMsg('Hi!', frm='op_!user@host.net')
msg = self.irc.takeMsg()
self.assertEqual(msg, None)
self.feedMsg('Hi!', frm='op!user@host.net')
msg = self.irc.takeMsg()
self.assertEqual(str(msg).strip(),
"PRIVMSG op :Hi, please review the ban 'asd!*@*' that you set on %s in #test, link: "\
"%s/bans.cgi?log=1" %(cb.bans['#test'][0].ascwhen, pluginConf.bansite()))
# check not pending anymore
self.assertFalse(cb.pendingReviews)
def testPersistentCache(self):
"""Save pending reviews and when bans were last checked. This is needed for plugin
reloads"""
msg1 = ircmsgs.privmsg('nick', 'Hello World')
msg2 = ircmsgs.privmsg('nick', 'Hello World')
msg3 = ircmsgs.notice('#chan', 'Hello World')
msg4 = ircmsgs.privmsg('nick_', 'Hello World')
pr = self.getCallback().pendingReviews
pr['host.net'] = [('op', msg1), ('op', msg2), ('op_', msg3)]
pr['home.net'] = [('dude', msg4)]
self.assertResponse('banreview', 'Pending ban reviews (4): op_:1 dude:1 op:2')
pr.close()
pr.clear()
pr.open()
self.assertResponse('banreview', 'Pending ban reviews (4): op_:1 dude:1 op:2')
items = pr['host.net']
self.assertTrue(items[0][0] == 'op' and items[0][1] == msg1)
self.assertTrue(items[1][0] == 'op' and items[1][1] == msg2)
self.assertTrue(items[2][0] == 'op_' and items[2][1] == msg3)
items = pr['home.net']
self.assertTrue(items[0][0] == 'dude' and items[0][1] == msg4)
def testReviewBanreview(self):
pr = self.getCallback().pendingReviews
m = ircmsgs.privmsg('#test', 'asd')
pr['host.net'] = [('op', m), ('op_', m), ('op', m)]
pr['home.net'] = [('dude', m)]
self.assertResponse('banreview', 'Pending ban reviews (4): op_:1 dude:1 op:2')
def testBan(self):
self.feedBan('asd!*@*')
fetch = self.query("SELECT id,channel,mask,operator FROM bans")
self.assertEqual((1, '#test', 'asd!*@*', 'op'), fetch[0])
def testQuiet(self):
self.feedBan('asd!*@*', mode='q')
fetch = self.query("SELECT id,channel,mask,operator FROM bans")
self.assertEqual((1, '#test', '%asd!*@*', 'op'), fetch[0])
def testKick(self):
self.feedBan('troll', mode='k')
fetch = self.query("SELECT id,channel,mask,operator FROM bans")
self.assertEqual((1, '#test', 'troll', 'op'), fetch[0])
def testPart(self):
self.feedBan('troll!user@trollpit.net', mode='p')
fetch = self.query("SELECT id,channel,mask,operator FROM bans")
self.assertEqual((1, '#test', 'troll!user@trollpit.net', 'op'), fetch[0])