139 lines
4.7 KiB
Python
139 lines
4.7 KiB
Python
from supybot.test import *
|
|
|
|
import supybot.conf as conf
|
|
import supybot.ircmsgs as ircmsgs
|
|
|
|
import time
|
|
|
|
|
|
pluginConf = conf.supybot.plugins.Bantracker
|
|
pluginConf.enabled.setValue(True)
|
|
pluginConf.bansite.setValue('http://foo.bar.com')
|
|
pluginConf.database.setValue('bantracker-test.db')
|
|
|
|
def quiet(channel, hostmask, prefix='', msg=None):
|
|
"""Returns a MODE to quiet nick on channel."""
|
|
return ircmsgs.mode(channel, ('+q', hostmask), prefix, msg)
|
|
|
|
class BantrackerTestCase(ChannelPluginTestCase):
|
|
plugins = ('Bantracker',)
|
|
|
|
def setUp(self):
|
|
super(BantrackerTestCase, self).setUp()
|
|
self.setDb()
|
|
pluginConf.commentRequest.ignore.set('*') # disable comments
|
|
|
|
def setDb(self):
|
|
import sqlite, os
|
|
dbfile = os.path.join(os.curdir, pluginConf.database())
|
|
try:
|
|
os.remove(dbfile)
|
|
except:
|
|
pass
|
|
db = sqlite.connect(dbfile)
|
|
cursor = db.cursor()
|
|
cursor.execute('CREATE TABLE bans ('
|
|
'id INTEGER PRIMARY KEY,'
|
|
'channel VARCHAR(30) NOT NULL,'
|
|
'mask VARCHAR(100) NOT NULL,'
|
|
'operator VARCHAR(30) NOT NULL,'
|
|
'time VARCHAR(300) NOT NULL,'
|
|
'removal DATETIME,'
|
|
'removal_op VARCHAR(30),'
|
|
'log TEXT)')
|
|
cursor.execute('CREATE TABLE comments ('
|
|
'ban_id INTEGER,'
|
|
'who VARCHAR(100) NOT NULL,'
|
|
'comment MEDIUMTEXT NOT NULL,'
|
|
'time VARCHAR(300) NOT NULL)')
|
|
cursor.execute('CREATE TABLE sessions ('
|
|
'session_id VARCHAR(50) PRIMARY KEY,'
|
|
'user MEDIUMTEXT NOT NULL,'
|
|
'time INT NOT NULL)')
|
|
cursor.execute('CREATE TABLE users ('
|
|
'username VARCHAR(50) PRIMARY KEY,'
|
|
'salt VARCHAR(8),'
|
|
'password VARCHAR(50))')
|
|
db.commit()
|
|
cursor.close()
|
|
db.close()
|
|
|
|
def getCallback(self):
|
|
for cb in self.irc.callbacks:
|
|
if cb.name() == 'Bantracker':
|
|
break
|
|
return cb
|
|
|
|
def getDb(self):
|
|
return self.getCallback().db
|
|
|
|
def query(self, query, parms=()):
|
|
cursor = self.getDb().cursor()
|
|
cursor.execute(query, parms)
|
|
return cursor.fetchall()
|
|
|
|
def feedBan(self, hostmask, prefix='', channel=None, mode='b'):
|
|
if not channel:
|
|
channel = self.channel
|
|
if not prefix:
|
|
prefix = 'op!user@host.net'
|
|
if mode == 'b':
|
|
ban = ircmsgs.ban(channel, hostmask, prefix=prefix)
|
|
elif mode == 'q':
|
|
ban = quiet(channel, hostmask, prefix=prefix)
|
|
self.irc.feedMsg(ban)
|
|
|
|
def testCommentRequest(self):
|
|
pluginConf.commentRequest.ignore.set('')
|
|
# test bans
|
|
self.feedBan('asd!*@*')
|
|
msg = self.irc.takeMsg()
|
|
self.assertEqual(str(msg).strip(),
|
|
"PRIVMSG op :Please comment on the ban of asd!*@* in #test, use: @comment 1"
|
|
" <comment>")
|
|
# test quiets
|
|
self.feedBan('dude!*@*', mode='q')
|
|
msg = self.irc.takeMsg()
|
|
self.assertEqual(str(msg).strip(),
|
|
"PRIVMSG op :Please comment on the quiet of dude!*@* in #test, use: @comment 2"
|
|
" <comment>")
|
|
|
|
def testReviewResquest(self):
|
|
pluginConf.commentRequest.ignore.set('')
|
|
cb = self.getCallback()
|
|
self.feedBan('asd!*@*')
|
|
self.irc.takeMsg() # ignore comment request comment
|
|
pluginConf.reviewAfterTime.setValue(1.0/84600) # one second
|
|
cb.reviewBans()
|
|
self.assertFalse(cb.pendingReviews)
|
|
print 'waiting 2 secs..'
|
|
time.sleep(2)
|
|
cb.reviewBans()
|
|
# check is pending
|
|
self.assertTrue(cb.pendingReviews)
|
|
# check msg if op and only op says something
|
|
self.feedMsg('Hi!', frm='dude!user@host.net')
|
|
msg = self.irc.takeMsg()
|
|
self.assertEqual(msg, None)
|
|
self.feedMsg('Hi!', frm='op!user@host.net')
|
|
msg = self.irc.takeMsg()
|
|
self.assertEqual(str(msg).strip(),
|
|
"PRIVMSG op :Hi, please review the ban 'asd!*@*' that you set on %s in #test, link: "\
|
|
"%s/bans.cgi?log=1" %(cb.bans['#test'][-1].ascwhen, pluginConf.bansite()))
|
|
# don't ask again
|
|
cb.reviewBans()
|
|
self.assertFalse(cb.pendingReviews)
|
|
|
|
def testBan(self):
|
|
self.feedBan('asd!*@*')
|
|
fetch = self.query("SELECT id,channel,mask,operator FROM bans")
|
|
self.assertEqual((1, '#test', 'asd!*@*', 'op'), fetch[0])
|
|
|
|
def testQuiet(self):
|
|
self.feedBan('asd!*@*', mode='q')
|
|
fetch = self.query("SELECT id,channel,mask,operator FROM bans")
|
|
self.assertEqual((1, '#test', '%asd!*@*', 'op'), fetch[0])
|
|
|
|
|
|
|