From 7473bf6fa74be0e595fdd05f346b9398a06a3dc1 Mon Sep 17 00:00:00 2001 From: Krytarik Raido Date: Sat, 20 Jun 2020 23:04:04 +0200 Subject: [PATCH] Bugtracker: Fix regression in persistency. * Make tracker settings persist across restarts. * Separate the various parts more properly. * Update default trackers. --- Bugtracker/__init__.py | 4 +- Bugtracker/config.py | 51 ++- Bugtracker/plugin.py | 752 ++++------------------------------------- Bugtracker/trackers.py | 632 ++++++++++++++++++++++++++++++++++ 4 files changed, 746 insertions(+), 693 deletions(-) create mode 100644 Bugtracker/trackers.py diff --git a/Bugtracker/__init__.py b/Bugtracker/__init__.py index 9dc66c4..7742af2 100644 --- a/Bugtracker/__init__.py +++ b/Bugtracker/__init__.py @@ -24,7 +24,7 @@ import supybot.world as world from imp import reload -__version__ = "4.1.0" +__version__ = "4.2.0" __author__ = supybot.Author("Krytarik Raido", "krytarik", "krytarik@tuxgarage.com") __contributors__ = { supybot.Author("Dennis Kaarsemaker", "Seveas", "dennis@kaarsemaker.net"): ['Original Author'], @@ -36,6 +36,8 @@ from . import config reload(config) from . import plugin reload(plugin) +from . import trackers +reload(trackers) if world.testing: from . import test diff --git a/Bugtracker/config.py b/Bugtracker/config.py index 8195c7b..c666691 100644 --- a/Bugtracker/config.py +++ b/Bugtracker/config.py @@ -2,6 +2,7 @@ ### # Copyright (c) 2005-2007 Dennis Kaarsemaker # Copyright (c) 2008-2011 Terence Simpson +# Copyright (c) 2017- Krytarik Raido # # This program is free software; you can redistribute it and/or modify # it under the terms of version 2 of the GNU General Public License as @@ -17,6 +18,12 @@ import supybot.conf as conf import supybot.registry as registry import supybot.ircutils as ircutils +import supybot.log as supylog + +from .trackers import defined_bugtrackers + +class Bugtrackers(registry.SpaceSeparatedListOfStrings): + List = ircutils.IrcSet def configure(advanced): from supybot.questions import expect, something, yn, output @@ -43,8 +50,8 @@ def configure(advanced): return repeatdelay output("Each of the next 3 questions can be set per-channel with the '@config channel' command.") - bugSnarfer = yn("Enable detecting bugs numbers and URL in all channels?", default=Bugtracker.bugSnarfer._default) - cveSnarfer = yn("Enable detecting CVE numbers and URL in all channels?", default=Bugtracker.cveSnarfer._default) + bugSnarfer = yn("Enable detecting bug numbers and URLs in all channels?", default=Bugtracker.bugSnarfer._default) + cveSnarfer = yn("Enable detecting CVE numbers and URLs in all channels?", default=Bugtracker.cveSnarfer._default) oopsSnarfer = yn("Enable detecting Launchpad OOPS IDs in all channels?", default=Bugtracker.oopsSnarfer._default) if advanced: replyNoBugtracker = something("What should the bot reply with when a user requests information from an unknown bug tracker?", default=Bugtracker.replyNoBugtracker._default) @@ -101,8 +108,8 @@ conf.registerChannelValue(Bugtracker, 'replyNoBugtracker', conf.registerChannelValue(Bugtracker, 'snarfTarget', registry.String('launchpad', """Determines the bugtracker to query when the snarf command is triggered""")) -conf.registerGroup(Bugtracker, 'bugtrackers', - help="""Determines what bugtrackers will be added to the bot when it starts.""") +conf.registerGlobalValue(Bugtracker, 'bugtrackers', + Bugtrackers([], """Determines what bugtrackers will be added to the bot when it starts.""")) conf.registerGlobalValue(Bugtracker, 'replyWhenNotFound', registry.Boolean(False, """Whether to send a message when a bug could not be found""")) @@ -121,3 +128,39 @@ conf.registerChannelValue(Bugtracker, 'extended', conf.registerGlobalValue(Bugtracker, 'saveDiscoveredTrackers', registry.Boolean(False, """Whether to save automatically discovered trackers to configuration""")) + +def registerBugtracker(name, url='', description='', trackertype='', aliases=[]): + Bugtracker.bugtrackers().add(name) + group = conf.registerGroup(Bugtracker.bugtrackers, name) + URL = conf.registerGlobalValue(group, 'url', registry.String('', 'URL of the tracker.')) + DESC = conf.registerGlobalValue(group, 'description', registry.String('', 'Full name of the tracker.')) + TRACKERTYPE = conf.registerGlobalValue(group, 'trackertype', registry.String('', 'Type of the tracker.')) + ALIASES = conf.registerGlobalValue(group, 'aliases', registry.SpaceSeparatedSetOfStrings([], 'Aliases of the tracker.')) + if url: + URL.setValue(url) + if description: + DESC.setValue(description) + if aliases: + ALIASES.setValue(aliases) + if trackertype: + if trackertype in defined_bugtrackers: + TRACKERTYPE.setValue(trackertype) + else: + supylog.warning("Bugtracker: Unknown trackertype '%s' (%s)" % (trackertype, name)) + +for name in Bugtracker.bugtrackers(): + registerBugtracker(name) + +default_bugtrackers = { + 'mozilla': ('https://bugzilla.mozilla.org', 'Mozilla', 'bugzilla', []), + 'gtk': ('https://gitlab.gnome.org/GNOME/gtk/-/issues', 'GTK', 'gitlab', []), + 'kde': ('https://bugs.kde.org', 'KDE', 'bugzilla', []), + 'lxde': ('https://sourceforge.net/p/lxde/bugs', 'LXDE', 'sourceforge', []), + 'openoffice': ('https://bz.apache.org/ooo', 'OpenOffice', 'bugzilla', []), + 'launchpad': ('https://launchpad.net', 'Launchpad', 'launchpad', ['lp', 'ubuntu', 'ubottu']), + 'debian': ('https://bugs.debian.org', 'Debian', 'debbugs', []), + 'irssi': ('https://github.com/irssi/irssi/issues', 'Irssi', 'github', []), + 'mantis': ('https://www.mantisbt.org/bugs', 'Mantis', 'mantis', []), + 'trac': ('https://trac.edgewall.org/ticket', 'Trac', 'trac', []), + 'pidgin': ('https://developer.pidgin.im/ticket', 'Pidgin', 'trac', []) +} diff --git a/Bugtracker/plugin.py b/Bugtracker/plugin.py index 354b951..bfa4c29 100644 --- a/Bugtracker/plugin.py +++ b/Bugtracker/plugin.py @@ -24,28 +24,11 @@ import supybot.conf as conf import supybot.registry as registry import supybot.log as supylog -import re, os, sys, time, json -import xml.dom.minidom as minidom -from email.parser import FeedParser -from pysimplesoap.client import SoapClient +import re, time -def registerBugtracker(name, url='', description='', trackertype='', aliases=[]): - group = conf.registerGroup(conf.supybot.plugins.Bugtracker.bugtrackers, name) - URL = conf.registerGlobalValue(group, 'url', registry.String(url, '')) - DESC = conf.registerGlobalValue(group, 'description', registry.String(description, '')) - TRACKERTYPE = conf.registerGlobalValue(group, 'trackertype', registry.String(trackertype, '')) - ALIASES = conf.registerGlobalValue(group, 'aliases', registry.SpaceSeparatedSetOfStrings(aliases, '')) - if url: - URL.setValue(url) - if description: - DESC.setValue(description) - if aliases: - ALIASES.setValue(aliases) - if trackertype: - if trackertype in defined_bugtrackers: - TRACKERTYPE.setValue(trackertype) - else: - raise BugtrackerError("Unknown trackertype: %s" % trackertype) +from .config import registerBugtracker, default_bugtrackers +from .trackers import defined_bugtrackers +from . import trackers def defaultIgnored(hostmask, recipient): if not conf.supybot.defaultIgnore(): @@ -84,40 +67,6 @@ def checkAddressed(text, channel): return True return False -def _getnodetxt(node): - L = [] - for childnode in node.childNodes: - if childnode.nodeType == childnode.TEXT_NODE: - L.append(childnode.data) - if not L: - raise ValueError("No text nodes") - val = ''.join(L) - if node.hasAttribute('encoding'): - encoding = node.getAttribute('encoding') - if encoding == 'base64': - try: - val = val.decode('base64') - except: - val = 'Cannot convert bug data from base64.' - return utils.web.htmlToText(val, tagReplace='') - -def _getnodeattr(node, attr): - if node.hasAttribute(attr): - val = node.getAttribute(attr) - else: - raise ValueError("No such attribute") - return utils.web.htmlToText(val, tagReplace='') - -class BugtrackerError(Exception): - """A bugtracker error""" - pass - -class BugNotFoundError(Exception): - """Pity, bug isn't there""" - pass - -cvere = re.compile(r']*>Description.*?]*>\s*(?P.*?)\s*', re.I | re.DOTALL) -cverre = re.compile(r']*>\s*(?P.*?)\s*', re.I | re.DOTALL) class Bugtracker(callbacks.PluginRegexp): """Show a link to a bug report with a brief description""" threaded = True @@ -133,18 +82,19 @@ class Bugtracker(callbacks.PluginRegexp): def set_trackers(self): self.db = ircutils.IrcDict() self.aliases = {} - trackers = self.registryValue('bugtrackers', value=False)._children + trackers = self.registryValue('bugtrackers') if not trackers: for (name, (url, description, trackertype, aliases)) in list(default_bugtrackers.items()): registerBugtracker(name, url, description, trackertype, aliases) - for name in list(trackers.keys()): - if trackers[name].trackertype() in defined_bugtrackers: - self.db[name] = defined_bugtrackers[trackers[name].trackertype()](name, trackers[name].url(), - trackers[name].description(), trackers[name].trackertype(), trackers[name].aliases()) - for a in trackers[name].aliases(): + for name in trackers: + tracker = self.registryValue('bugtrackers', value=False).get(name) + if tracker.trackertype() in defined_bugtrackers: + self.db[name] = defined_bugtrackers[tracker.trackertype()](name, tracker.url(), + tracker.description(), tracker.trackertype(), tracker.aliases()) + for a in tracker.aliases(): self.aliases[a] = name else: - supylog.warning("Bugtracker: Unknown trackertype: %s (%s)" % (trackers[name].trackertype(), name)) + supylog.warning("Bugtracker: Unknown trackertype '%s' (%s)" % (trackers[name].trackertype(), name)) self.shorthand = utils.abbrev(list(self.db.keys())) def is_ok(self, channel, tracker, bug): @@ -195,9 +145,10 @@ class Bugtracker(callbacks.PluginRegexp): for a in self.db[name].aliases: del self.aliases[a] del self.db[name] - group = self.registryValue('bugtrackers', value=False) - if name in group._children: - group.unregister(name) + trackers = self.registryValue('bugtrackers') + if name in trackers: + trackers.remove(name) + self.registryValue('bugtrackers', value=False).unregister(name) self.shorthand = utils.abbrev(list(self.db.keys())) irc.replySuccess() except KeyError: @@ -221,9 +172,10 @@ class Bugtracker(callbacks.PluginRegexp): self.db[newname] = defined_bugtrackers[tracker.trackertype](newname, tracker.url, d, tracker.aliases) registerBugtracker(newname, tracker.url, d, tracker.trackertype, tracker.aliases) del self.db[oldname] - group = self.registryValue('bugtrackers', value=False) - if oldname in group._children: - group.unregister(oldname) + trackers = self.registryValue('bugtrackers') + if oldname in trackers: + trackers.remove(oldname) + self.registryValue('bugtrackers', value=False).unregister(oldname) self.shorthand = utils.abbrev(list(self.db.keys())) for a in tracker.aliases: self.aliases[a] = newname @@ -246,11 +198,12 @@ class Bugtracker(callbacks.PluginRegexp): return alias = alias.lower() self.db[name].aliases.add(alias) - trackers = self.registryValue('bugtrackers', value=False)._children - if name not in trackers: + trackers = self.registryValue('bugtrackers') + if name in trackers: + self.registryValue('bugtrackers', value=False).get(name).aliases().add(alias) + else: tracker = self.db[name] registerBugtracker(tracker.name, tracker.url, tracker.description, tracker.trackertype, tracker.aliases) - trackers[name].aliases().add(alias) self.aliases[alias] = name irc.replySuccess() except KeyError: @@ -265,13 +218,12 @@ class Bugtracker(callbacks.PluginRegexp): """ try: name = self.shorthand[name.lower()] - c = 'bugtrackers.%s.aliases' % name.replace('.','\\.') alias = alias.lower() try: self.db[name].aliases.remove(alias) - trackers = self.registryValue('bugtrackers', value=False)._children + trackers = self.registryValue('bugtrackers') if name in trackers: - trackers[name].aliases().remove(alias) + self.registryValue('bugtrackers', value=False).get(name).aliases().remove(alias) del self.aliases[alias] irc.replySuccess() except ValueError: @@ -310,15 +262,16 @@ class Bugtracker(callbacks.PluginRegexp): Reset defined bugtrackers to defaults. If is specified, reset only that bugtracker. """ - group = self.registryValue('bugtrackers', value=False) if name: try: name = self.shorthand[name.lower()] for a in self.db[name].aliases: del self.aliases[a] del self.db[name] - if name in group._children: - group.unregister(name) + trackers = self.registryValue('bugtrackers') + if name in trackers: + trackers.remove(name) + self.registryValue('bugtrackers', value=False).unregister(name) if name in default_bugtrackers: (url, description, trackertype, aliases) = default_bugtrackers[name] if trackertype in defined_bugtrackers: @@ -327,15 +280,17 @@ class Bugtracker(callbacks.PluginRegexp): self.aliases[a] = name registerBugtracker(name, url, description, trackertype, aliases) else: - supylog.warning("Bugtracker: Unknown trackertype: %s (%s)" % (trackers[name].trackertype(), name)) + supylog.warning("Bugtracker: Unknown trackertype '%s' (%s)" % (trackers[name].trackertype(), name)) self.shorthand = utils.abbrev(list(self.db.keys())) except KeyError: s = self.registryValue('replyNoBugtracker', msg.args[0] if ircutils.isChannel(msg.args[0]) else None) irc.error(s % name) return else: - for name in list(group._children.keys())[:]: - group.unregister(name) + trackers = self.registryValue('bugtrackers') + for name in list(trackers)[:]: + trackers.remove(name) + self.registryValue('bugtrackers', value=False).unregister(name) self.set_trackers() irc.replySuccess() reset = wrap(reset, [('checkCapability', 'admin'), additional('text')]) @@ -425,10 +380,10 @@ class Bugtracker(callbacks.PluginRegexp): try: report = self.get_bug(channel or msg.nick, tracker, bugtype, bugid, self.registryValue('showassignee', channel), self.registryValue('extended', channel), do_tracker=showTracker) - except BugNotFoundError: + except trackers.BugNotFoundError: if self.registryValue('replyWhenNotFound'): irc.error("Could not find %s bug %d" % (tracker.description, bugid)) - except BugtrackerError as e: + except trackers.BugtrackerError as e: if self.registryValue('replyWhenError') and sure_bug: irc.error(str(e)) else: @@ -456,10 +411,10 @@ class Bugtracker(callbacks.PluginRegexp): return report = self.get_bug(channel or msg.nick, tracker, 'url', bugid, self.registryValue('showassignee', channel), self.registryValue('extended', channel), do_url=False) - except BugNotFoundError: + except trackers.BugNotFoundError: if self.registryValue('replyWhenNotFound'): irc.error("Could not find %s bug %s" % (tracker.description, match.group('bug'))) - except BugtrackerError as e: + except trackers.BugtrackerError as e: if self.registryValue('replyWhenError'): irc.error(str(e)) else: @@ -472,7 +427,7 @@ class Bugtracker(callbacks.PluginRegexp): channel = msg.args[0] if ircutils.isChannel(msg.args[0]) else None if checkAddressed(msg.args[1].strip(), channel): return - if not self.registryValue('bugSnarfer', channel) or not self.registryValue('oopsSnarfer', channel): + if not (self.registryValue('bugSnarfer', channel) and self.registryValue('oopsSnarfer', channel)): return oopsid = match.group('oopsid') if not self.is_ok(channel or msg.nick, 'lpoops', oopsid): @@ -485,29 +440,26 @@ class Bugtracker(callbacks.PluginRegexp): channel = msg.args[0] if ircutils.isChannel(msg.args[0]) else None if checkAddressed(msg.args[1].strip(), channel): return - if not self.registryValue('bugSnarfer', channel) or not self.registryValue('cveSnarfer', channel): + if not (self.registryValue('bugSnarfer', channel) and self.registryValue('cveSnarfer', channel)): return cveid = match.group('cveid').replace(' ','-') if not self.is_ok(channel or msg.nick, 'cve', cveid): return - url = 'https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-%s' % cveid - try: - cvedata = utils.web.getUrl(url).decode('utf-8') - except Exception as e: - raise BugtrackerError('Could not get CVE data: %s (%s)' % (e, url)) - m = cvere.search(cvedata) - if m: - cve = utils.web.htmlToText(m.group('cve'), tagReplace='') - if len(cve) > 380: - cve = cve[:380] + '...' - if not match.group(1): - cve += ' <%s>' % url - irc.reply(cve) + if not match.group(1): + do_url = True else: - m = cverre.search(cvedata) - if m: - cverr = utils.web.htmlToText(m.group('cverr'), tagReplace='') - irc.reply(cverr) + do_url = False + try: + report = trackers.CVE().get_bug(cveid, do_url) + except trackers.BugNotFoundError: + if self.registryValue('replyWhenNotFound'): + irc.error("Could not find CVE %s" % cveid) + except trackers.BugtrackerError as e: + if self.registryValue('replyWhenError'): + irc.error(str(e)) + else: + if report: + irc.reply(report) #TODO: As we will depend on launchpadlib, we should consider using lazr.uri.URI to do URL parsing def get_tracker(self, snarfurl, bugid): @@ -525,25 +477,25 @@ class Bugtracker(callbacks.PluginRegexp): # No tracker found, bummer. Let's try and get one if 'show_bug.cgi' in snarfurl: - tracker = Bugzilla().get_tracker(snarfurl) + tracker = trackers.Bugzilla().get_tracker(snarfurl) elif 'sourceforge.net' in snarfurl: - tracker = SourceForge().get_tracker(snarfurl) + tracker = trackers.SourceForge().get_tracker(snarfurl) elif 'github.com' in snarfurl: - tracker = GitHub().get_tracker(snarfurl) + tracker = trackers.GitHub().get_tracker(snarfurl) elif re.match(r'[^\s/]+/[^\s/]+(/[^\s/]+)+/-/(issues|merge_requests)', snarfurl) \ or re.match(r'[^\s/]+/[^\s/]+(/[^\s/]+)+/merge_requests', snarfurl) \ or re.match(r'[^\s/]+/[^\s/]+(/[^\s/]+){2,}/issues', snarfurl): - tracker = GitLab().get_tracker(snarfurl, bugid) + tracker = trackers.GitLab().get_tracker(snarfurl, bugid) elif re.match(r'[^\s/]+/[^\s/]+/[^\s/]+/issues', snarfurl): - tracker = GitLab().get_tracker(snarfurl, bugid) + tracker = trackers.GitLab().get_tracker(snarfurl, bugid) if not tracker: - tracker = Gitea().get_tracker(snarfurl, bugid) + tracker = trackers.Gitea().get_tracker(snarfurl, bugid) elif re.match(r'[^\s/]+/[^\s/]+/[^\s/]+/pulls', snarfurl): - tracker = Gitea().get_tracker(snarfurl, bugid) + tracker = trackers.Gitea().get_tracker(snarfurl, bugid) elif 'view.php' in snarfurl: - tracker = Mantis().get_tracker(snarfurl) + tracker = trackers.Mantis().get_tracker(snarfurl) elif '/ticket/' in snarfurl: - tracker = Trac().get_tracker(snarfurl) + tracker = trackers.Trac().get_tracker(snarfurl) else: return @@ -617,580 +569,4 @@ class Bugtracker(callbacks.PluginRegexp): return report -# Define all bugtrackers -class IBugtracker: - def __init__(self, name=None, url=None, description=None, trackertype=None, aliases=[]): - self.name = name - self.url = url - self.description = description - self.trackertype = trackertype - self.aliases = set(aliases) - self.errget = 'Could not get data from %s: %s (%s)' - self.errparse = 'Could not parse data from %s: %s (%s)' - - def get_bug(self, bugid): - raise BugTrackerError("Bugtracker class does not implement get_bug") - - def get_tracker(self, url): - raise BugTrackerError("Bugtracker class does not implement get_tracker") - - def __str__(self): - return '%s(%s)' % (self.__class__.__name__, self.url) - - def __hash__(self): - return hash(self.url) - - def __cmp__(self, other): # used implicitly in Bugtracker.is_ok() - return cmp(hash(self), hash(other)) - - def __str__(self): - return self.name - -class Bugzilla(IBugtracker): - def get_tracker(self, url): - try: - match = re.match(r'(?P(?P[^\s/]+).*)/show_bug\.cgi', url) - desc = match.group('desc') - name = desc.lower() - url = 'https://%s' % match.group('url') - return Bugzilla(name, url, desc, 'bugzilla') - except: - pass - - def get_bug(self, bugtype, bugid): - url = "%s/rest/bug/%d" % (self.url, bugid) - try: - bugjson = utils.web.getUrl(url) - bug = json.loads(bugjson.decode('utf-8'))['bugs'][0] - except Exception as e: - # For old-stable Bugzilla - if 'HTTP Error 404' in str(e): - return self.get_bug_old(bugtype, bugid) - raise BugtrackerError(self.errget % (self.description, e, url)) - try: - status = bug['status'] - if bug['resolution']: - status += ': %s' % bug['resolution'] - if bug['assigned_to_detail']: - assignee = bug['assigned_to_detail']['real_name'] - if not assignee: - assignee = bug['assigned_to_detail']['name'] - else: - assignee = '' - return (bugid, bug['product'], bug['summary'], bug['severity'], status, assignee, - "%s/show_bug.cgi?id=%d" % (self.url, bugid), [], []) - except Exception as e: - raise BugtrackerError(self.errparse % (self.description, e, url)) - - def get_bug_old(self, bugtype, bugid): # Deprecated - url = "%s/show_bug.cgi?id=%d&ctype=xml" % (self.url, bugid) - try: - bugxml = utils.web.getUrl(url) - zilladom = minidom.parseString(bugxml) - except Exception as e: - raise BugtrackerError(self.errget % (self.description, e, url)) - bug_n = zilladom.getElementsByTagName('bug')[0] - if bug_n.hasAttribute('error'): - errtxt = bug_n.getAttribute('error') - if errtxt in ('NotFound', 'InvalidBugId'): - raise BugNotFoundError - s = 'Could not get %s bug #%d: %s' % (self.description, bugid, errtxt) - raise BugtrackerError(s) - try: - title = _getnodetxt(bug_n.getElementsByTagName('short_desc')[0]) - status = _getnodetxt(bug_n.getElementsByTagName('bug_status')[0]) - try: - status = "%s: %s" % (status, _getnodetxt(bug_n.getElementsByTagName('resolution')[0])) - except: - pass - product = _getnodetxt(bug_n.getElementsByTagName('product')[0]) - severity = _getnodetxt(bug_n.getElementsByTagName('bug_severity')[0]) - try: - assignee = _getnodeattr(bug_n.getElementsByTagName('assigned_to')[0], 'name') - except: - try: - assignee = _getnodetxt(bug_n.getElementsByTagName('assigned_to')[0]) - except: - assignee = '' - except Exception as e: - raise BugtrackerError(self.errparse % (self.description, e, url)) - return (bugid, product, title, severity, status, assignee, "%s/show_bug.cgi?id=%d" % (self.url, bugid), [], []) - -class Launchpad(IBugtracker): - statuses = ("Unknown", "Invalid", "Opinion", "Won't Fix", "Fix Released", "Fix Committed", "New", - "Incomplete", "Confirmed", "Triaged", "In Progress") - severities = ("Unknown", "Undecided", "Wishlist", "Low", "Medium", "High", "Critical") - - def __init__(self, *args, **kwargs): - IBugtracker.__init__(self, *args, **kwargs) - self.lp = None - - # A word to the wise: - # The Launchpad API is much better than the /+text interface we currently use, - # it's faster and easier to get the information we need. - # The current /+text interface is not really maintained by Launchpad and most, - # or all, of the Launchpad developers hate it. For this reason, we are dropping - # support for /+text in the future in favour of launchpadlib. - # Terence Simpson (tsimpson) 2010-04-20 - - try: - from launchpadlib.launchpad import Launchpad - cachedir = os.path.join(conf.supybot.directories.data.tmp(), 'launchpadlib') - self.lp = Launchpad.login_anonymously("Ubuntu Bots - Bugtracker", 'production', cachedir, version='devel') - except ImportError: - supylog.warning("Please install python-launchpadlib, the old interface is deprecated") - except Exception: - self.lp = None - supylog.exception("Unknown exception while accessing the Launchpad API") - - def _parse(self, task): # Deprecated - parser = FeedParser() - parser.feed(task) - return parser.close() - - @classmethod - def _rank(cls, task): - try: - return float('%d.%02d' % (cls.statuses.index(task.status), - cls.severities.index(task.importance))) - except: - return 0 - - @classmethod - def _rank_old(cls, task): - try: - return float('%d.%02d' % (cls.statuses.index(task['status']), - cls.severities.index(task['importance']))) - except: - return 0 - - @classmethod - def _sort(cls, task1, task2): # Deprecated - try: - if task1.status != task2.status: - if cls.statuses.index(task1.status) < cls.statuses.index(task2.status): - return -1 - return 1 - - if task1.importance != task2.importance: - if cls.severities.index(task1.importance) < cls.severities.index(task2.importance): - return -1 - return 1 - except: - return 0 - return 0 - - @classmethod - def _sort_old(cls, task1, task2): # Deprecated - try: - if task1['status'] != task2['status']: - if cls.statuses.index(task1['status']) < cls.statuses.index(task2['status']): - return -1 - return 1 - - if task1['importance'] != task2['importance']: - if cls.severities.index(task1['importance']) < cls.severities.index(task2['importance']): - return -1 - return 1 - except: - return 0 - return 0 - - def get_bug(self, bugtype, bugid): #TODO: Remove this method and rename 'get_bug_new' to 'get_bug' - if self.lp: - return self.get_bug_new(bugtype, bugid) - return self.get_bug_old(bugtype, bugid) - - def get_bug_new(self, bugtype, bugid): #TODO: Rename this method to 'get_bug' - try: - bugdata = self.lp.bugs[bugid] - if bugdata.private: - raise BugtrackerError("This bug is private") - duplicate = [] - dup = bugdata.duplicate_of - while dup: - duplicate.append(str(bugdata.id)) - bugdata = dup - dup = bugdata.duplicate_of - - extinfo = ['affected: %d' % bugdata.users_affected_count_with_dupes] - extinfo.append('heat: %d' % bugdata.heat) - tasks = bugdata.bug_tasks - - if tasks.total_size > 1: - taskdata = sorted(tasks, key=self._rank)[-1] - else: - taskdata = tasks[0] - - if taskdata.assignee: - assignee = taskdata.assignee.display_name - else: - assignee = '' - - except Exception as e: - if type(e).__name__ == 'HTTPError': # messy, but saves trying to import lazr.restfulclient.errors.HTPError - if e.response.status == 404: - bugNo = e.content.split()[-1][2:-1] # extract the real bug number - if bugNo != str(bugid): # A duplicate of a private bug, at least we know it exists - raise BugtrackerError('Bug #%d is a duplicate of bug #%s, but it is private (%s/bugs/%s)' % (bugid, bugNo, self.url, bugNo)) - raise BugtrackerError("Bug #%d is private or does not exist (%s/bugs/%d)" % (bugid, self.url, bugid)) # Could be private, could just not exist - raise BugtrackerError(self.errget % (self.description, e, '%s/bugs/%d' % (self.url, bugid))) - elif isinstance(e, KeyError): - raise BugNotFoundError - raise BugtrackerError(self.errget % (self.description, e, '%s/bugs/%d' % (self.url, bugid))) - - return (bugdata.id, taskdata.bug_target_display_name, bugdata.title, taskdata.importance, taskdata.status, - assignee, "%s/bugs/%d" % (self.url, bugdata.id), extinfo, duplicate) - - def get_bug_old(self, bugtype, bugid, duplicate=None): # Deprecated - try: - bugdata = utils.web.getUrl("%s/bugs/%d/+text" % (self.url, bugid)).decode('utf-8') - except Exception as e: - if 'HTTP Error 404' in str(e): - if duplicate: - raise BugtrackerError('Bug #%d is a duplicate of bug #%d, but it is private (%s/bugs/%d)' % (duplicate, bugid, self.url, bugid)) - else: - raise BugNotFoundError - raise BugtrackerError(self.errget % (self.description, e, '%s/bugs/%d' % (self.url, bugid))) - - try: - # Split bug data into separate pieces (bug data, task data) - data = bugdata.split('\n\nContent-Type:', 1)[0].split('\n\n') - bugdata = self._parse(data[0]) - if not bugdata['duplicate-of']: - taskdata = list(map(self._parse, data[1:])) - if len(taskdata) > 1: - taskdata = sorted(taskdata, key=self._rank_old)[-1] - else: - taskdata = taskdata[0] - if taskdata['assignee']: - assignee = re.sub(r' \([^)]*\)$', '', taskdata['assignee']) - else: - assignee = '' - except Exception as e: - raise BugtrackerError(self.errparse % (self.description, e, '%s/bugs/%d' % (self.url, bugid))) - - # Try and find duplicates - if bugdata['duplicate-of']: - data = self.get_bug_old(bugtype, int(bugdata['duplicate-of']), duplicate or bugid) - data[8].append(bugdata['bug']) - return data - - return (bugid, taskdata['task'], bugdata['title'], taskdata['importance'], taskdata['status'], - assignee, "%s/bugs/%d" % (self.url, bugid), [], []) - -# -# Debbugs sucks donkeyballs -# * HTML pages are inconsistent -# * Parsing mboxes gets incorrect with cloning perversions (eg with bug 330000) -# * No sane way of accessing bug reports in a machine readable way (bts2ldap -# has no search on bugid) -# * The damn thing allow incomplete bugs, eg bugs without severity set. WTF?!? -# -# Fortunately bugs.donarmstrong.com has a SOAP interface which we can use. -# -class Debbugs(IBugtracker): - def __init__(self, *args, **kwargs): - IBugtracker.__init__(self, *args, **kwargs) - self.soap_client = SoapClient("%s/cgi-bin/soap.cgi" % self.url, namespace="Debbugs/SOAP") - - def get_bug(self, bugtype, bugid): - url = "%s/cgi-bin/bugreport.cgi?bug=%d" % (self.url, bugid) - try: - raw = self.soap_client.get_status(bugs=bugid) - except Exception as e: - raise BugtrackerError(self.errget % (self.description, e, url)) - if not hasattr(raw, 'item'): - raise BugNotFoundError - try: - raw = raw.item.value - if str(raw.fixed_versions): - status = 'Fixed' - else: - status = 'Open' - return (bugid, str(raw.package), str(raw.subject), str(raw.severity), status, '', "%s/%d" % (self.url, bugid), [], []) - except Exception as e: - raise BugtrackerError(self.errparse % (self.description, e, url)) - -class SourceForge(IBugtracker): - def get_tracker(self, url): - try: - match = re.match(r'sourceforge\.net/p/[^\s/]+/(bugs|feature-requests|patches|todo)', url) - desc = match.group(0) - name = desc.lower() - url = 'https://%s' % desc - return SourceForge(name, url, desc, 'sourceforge') - except: - pass - - def get_bug(self, bugtype, bugid): - url = "%s/%d/" % (self.url.replace('sourceforge.net', 'sourceforge.net/rest'), bugid) - try: - bugjson = utils.web.getUrl(url) - bug = json.loads(bugjson.decode('utf-8'))['ticket'] - except Exception as e: - raise BugtrackerError(self.errget % (self.description, e, url)) - try: - product = severity = '' - if bug['labels']: - product = bug['labels'][0] - if '_priority' in bug['custom_fields']: - severity = 'Pri: %s' % bug['custom_fields']['_priority'] - return (bugid, product, bug['summary'], severity, ': '.join(bug['status'].split('-')), - bug['assigned_to'], "%s/%d/" % (self.url, bugid), [], []) - except Exception as e: - raise BugtrackerError(self.errparse % (self.description, e, url)) - -class GitHub(IBugtracker): - def get_tracker(self, url): - try: - match = re.match(r'github\.com/[^\s/]+/[^\s/]+/(issues|pulls?)', url) - desc = match.group(0) - url = 'https://%s' % desc - # Pulls are inconsistent in main and single page URLs - desc = re.sub(r'/pull$', r'/pulls', desc) - name = desc.lower() - return GitHub(name, url, desc, 'github') - except: - pass - - def get_bug(self, bugtype, bugid): - url = "%s/%d" % (self.url.replace('github.com', 'api.github.com/repos'), bugid) - # Pulls are inconsistent in web and API URLs - url = url.replace('/pull/', '/pulls/') - if bugtype in ('issue', 'bug'): - url = url.replace('/pulls/', '/issues/') - elif bugtype in ('pull', 'pr', 'merge', 'mr'): - url = url.replace('/issues/', '/pulls/') - try: - bugjson = utils.web.getUrl(url) - bug = json.loads(bugjson.decode('utf-8')) - except Exception as e: - raise BugtrackerError(self.errget % (self.description, e, url)) - try: - product = '/'.join(self.url.split('/')[-3:-1]) - if 'merged' in bug and bug['merged']: - status = 'Merged' - else: - status = bug['state'] - if bug['assignee']: - assignee = bug['assignee']['login'] - else: - assignee = '' - return (bugid, product, bug['title'], '', status, assignee, bug['html_url'], [], []) - except Exception as e: - raise BugtrackerError(self.errparse % (self.description, e, url)) - -class GitLab(IBugtracker): - def get_tracker(self, url, bugid): - try: - match = re.match(r'[^\s/]+/(?P[^\s/]+/[^\s/]+(/[^\s/]+)*?)/(-/)?(issues|merge_requests)', url) - desc = match.group(0) - name = desc.lower() - url = 'https://%s' % desc - bugurl = "%s/%d" % (re.sub(r'(://[^\s/]+)/[^\s/]+(/[^\s/]+)+/(-/)?', - r'\g<1>/api/v4/projects/%s/' % match.group('project').replace('/', '%2F'), url), bugid) - bugjson = utils.web.getUrl(bugurl) - bug = json.loads(bugjson.decode('utf-8')) - return GitLab(name, url, desc, 'gitlab') - except: - pass - - def get_bug(self, bugtype, bugid): - match = re.match(r'[^\s:]+://[^\s/]+/(?P[^\s/]+/[^\s/]+(/[^\s/]+)*?)/(-/)?(issues|merge_requests)', self.url) - url = "%s/%d" % (re.sub(r'(://[^\s/]+)/[^\s/]+(/[^\s/]+)+/(-/)?', - r'\g<1>/api/v4/projects/%s/' % match.group('project').replace('/', '%2F'), self.url), bugid) - if bugtype in ('issue', 'bug'): - url = url.replace('/merge_requests/', '/issues/') - elif bugtype in ('merge', 'mr', 'pull', 'pr'): - url = url.replace('/issues/', '/merge_requests/') - try: - bugjson = utils.web.getUrl(url) - bug = json.loads(bugjson.decode('utf-8')) - except Exception as e: - raise BugtrackerError(self.errget % (self.description, e, url)) - try: - product = match.group('project') - status = bug['state'] - if bug['assignees']: - assino = len(bug['assignees']) - if assino == 1: - assignee = bug['assignees'][0]['name'] - else: - assignee = '%d people' % assino - else: - assignee = '' - return (bugid, product, bug['title'], '', status, assignee, bug['web_url'], [], []) - except Exception as e: - raise BugtrackerError(self.errparse % (self.description, e, url)) - -class Gitea(IBugtracker): - def get_tracker(self, url, bugid): - try: - match = re.match(r'[^\s/]+/[^\s/]+/[^\s/]+/(issues|pulls)', url) - desc = match.group(0) - name = desc.lower() - url = 'https://%s' % desc - bugurl = '%s/%d' % (re.sub(r'(://[^\s/]+)/', r'\g<1>/api/v1/repos/', url), bugid) - bugjson = utils.web.getUrl(bugurl) - bug = json.loads(bugjson.decode('utf-8')) - return Gitea(name, url, desc, 'gitea') - except: - pass - - def get_bug(self, bugtype, bugid): - url = "%s/%d" % (re.sub(r'(://[^\s/]+)/', r'\g<1>/api/v1/repos/', self.url), bugid) - if bugtype in ('issue', 'bug'): - url = url.replace('/pulls/', '/issues/') - elif bugtype in ('pull', 'pr', 'merge', 'mr'): - url = url.replace('/issues/', '/pulls/') - try: - bugjson = utils.web.getUrl(url) - bug = json.loads(bugjson.decode('utf-8')) - except Exception as e: - raise BugtrackerError(self.errget % (self.description, e, url)) - try: - product = '/'.join(self.url.split('/')[-3:-1]) - if 'merged' in bug and bug['merged']: - status = 'Merged' - else: - status = bug['state'] - if bug['assignee']: - assignee = bug['assignee']['username'] - else: - assignee = '' - # Issues have no 'html_url', but pulls do - if 'html_url' in bug: - htmlurl = bug['html_url'] - else: - htmlurl = url.replace('/api/v1/repos/', '/') - return (bugid, product, bug['title'], '', status, assignee, htmlurl, [], []) - except Exception as e: - raise BugtrackerError(self.errparse % (self.description, e, url)) - -class Mantis(IBugtracker): - def __init__(self, *args, **kwargs): - IBugtracker.__init__(self, *args, **kwargs) - self.soap_client = SoapClient("%s/api/soap/mantisconnect.php" % self.url, namespace="http://futureware.biz/mantisconnect") - - def get_tracker(self, url): - try: - match = re.match(r'(?P(?P[^\s/]+).*)/view\.php', url) - desc = match.group('desc') - name = desc.lower() - url = 'https://%s' % match.group('url') - return Mantis(name, url, desc, 'mantis') - except: - pass - - def get_bug(self, bugtype, bugid): - url = "%s/api/rest/issues/%d" % (self.url, bugid) - try: - bugjson = utils.web.getUrl(url) - bug = json.loads(bugjson.decode('utf-8'))['issues'][0] - except Exception as e: - # REST API may not be enabled yet - if 'HTTP Error 404' in str(e): - return self.get_bug_old(bugtype, bugid) - raise BugtrackerError(self.errget % (self.description, e, url)) - try: - return (bugid, bug['project']['name'], bug['summary'], bug['severity']['name'], bug['resolution']['name'], '', url, [], []) - except Exception as e: - raise BugtrackerError(self.errparse % (self.description, e, url)) - - def get_bug_old(self, bugtype, bugid): # Deprecated - url = "%s/view.php?id=%d" % (self.url, bugid) - try: - raw = self.soap_client.mc_issue_get(username='', password='', issue_id=bugid) - except Exception as e: - if 'Issue #%d not found' % bugid in str(e): - raise BugNotFoundError - # Often SOAP is not enabled - if '.' in self.name: - supylog.exception(self.errget % (self.description, e, url)) - return - raise BugtrackerError(self.errget % (self.description, e, url)) - if not hasattr(raw, 'id'): - raise BugNotFoundError - try: - return (bugid, str(raw.project.name), str(raw.summary), str(raw.severity.name), str(raw.resolution.name), '', url, [], []) - except Exception as e: - raise BugtrackerError(self.errparse % (self.description, e, url)) - -# For Trac-based trackers we get the tab-separated-values format. -# The other option is a comma-separated-values format, but if the description -# has commas, things get tricky. -# This should be more robust than the screen scraping done previously. -class Trac(IBugtracker): - def get_tracker(self, url): - try: - match = re.match(r'(?P[^\s/]+).*/ticket', url) - desc = match.group('desc') - name = desc.lower() - url = 'https://%s' % match.group(0) - return Trac(name, url, desc, 'trac') - except: - pass - - def get_bug(self, bugtype, bugid): # This is still a little rough, but it works :) - url = "%s/%d" % (self.url, bugid) - try: - raw = utils.web.getUrl("%s?format=tab" % url).decode('utf-8') - except Exception as e: - # Due to unreliable matching - if '.' in self.name: - supylog.exception(self.errget % (self.description, e, url)) - return - if 'HTTP Error 500' in str(e): - raise BugNotFoundError - raise BugtrackerError(self.errget % (self.description, e, url)) - try: - raw = raw.replace('\r\n', '\n') - (headers, rest) = raw.split('\n', 1) - headers = headers.strip().split('\t') - rest = rest.strip().split('\t') - - title = rest[headers.index("summary")] - status = rest[headers.index("status")] - package = rest[headers.index("component")] - severity = assignee = "" - if "severity" in headers: - severity = rest[headers.index("severity")] - elif "priority" in headers: - severity = rest[headers.index("priority")] - if "owner" in headers: - assignee = rest[headers.index("owner")] - return (bugid, package, title, severity, status, assignee, url, [], []) - except Exception as e: - # Due to unreliable matching - if '.' in self.name: - supylog.exception(self.errparse % (self.description, e, url)) - return - raise BugtrackerError(self.errparse % (self.description, e, url)) - -# Introspection is quite cool -defined_bugtrackers = {} -v = vars() -for k in list(v.keys()): - if type(v[k]) == type(IBugtracker) and issubclass(v[k], IBugtracker) and not (v[k] == IBugtracker): - defined_bugtrackers[k.lower()] = v[k] - -default_bugtrackers = { - 'mozilla': ('https://bugzilla.mozilla.org', 'Mozilla', 'bugzilla', []), - 'gtk': ('https://gitlab.gnome.org/GNOME/gtk/issues', 'GTK', 'gitlab', []), - 'kde': ('https://bugs.kde.org', 'KDE', 'bugzilla', []), - 'xfce': ('https://bugzilla.xfce.org', 'Xfce', 'bugzilla', []), - 'lxde': ('https://sourceforge.net/p/lxde/bugs', 'LXDE', 'sourceforge', []), - 'freedesktop': ('https://bugzilla.freedesktop.org', 'Freedesktop', 'bugzilla', []), - 'freedesktop2': ('https://bugs.freedesktop.org', 'Freedesktop', 'bugzilla', []), - 'openoffice': ('https://bz.apache.org/ooo', 'OpenOffice', 'bugzilla', []), - 'launchpad': ('https://launchpad.net', 'Launchpad', 'launchpad', ['lp', 'ubuntu', 'ubottu']), - 'debian': ('https://bugs.debian.org', 'Debian', 'debbugs', []), - 'supybot': ('https://sourceforge.net/p/supybot/bugs', 'Supybot', 'sourceforge', []), - 'irssi': ('https://github.com/irssi/irssi/issues', 'Irssi', 'github', []), - 'mantis': ('https://www.mantisbt.org/bugs', 'Mantis', 'mantis', []), - 'trac': ('https://trac.edgewall.org/ticket', 'Trac', 'trac', []), - 'pidgin': ('https://developer.pidgin.im/ticket', 'Pidgin', 'trac', []) -} - Class = Bugtracker diff --git a/Bugtracker/trackers.py b/Bugtracker/trackers.py new file mode 100644 index 0000000..150227c --- /dev/null +++ b/Bugtracker/trackers.py @@ -0,0 +1,632 @@ +# -*- Encoding: utf-8 -*- +### +# Copyright (c) 2005-2007 Dennis Kaarsemaker +# Copyright (c) 2008-2010 Terence Simpson +# Copyright (c) 2017- Krytarik Raido +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of version 2 of the GNU General Public License as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +### + +import supybot.utils as utils +import supybot.conf as conf +import supybot.log as supylog + +import re, os, json +import xml.dom.minidom as minidom +from email.parser import FeedParser +from pysimplesoap.client import SoapClient + +def _getnodetxt(node): + L = [] + for childnode in node.childNodes: + if childnode.nodeType == childnode.TEXT_NODE: + L.append(childnode.data) + if not L: + raise ValueError("No text nodes") + val = ''.join(L) + if node.hasAttribute('encoding'): + encoding = node.getAttribute('encoding') + if encoding == 'base64': + try: + val = val.decode('base64') + except: + val = 'Cannot convert bug data from base64.' + return utils.web.htmlToText(val, tagReplace='') + +def _getnodeattr(node, attr): + if node.hasAttribute(attr): + val = node.getAttribute(attr) + else: + raise ValueError("No such attribute") + return utils.web.htmlToText(val, tagReplace='') + +class BugtrackerError(Exception): + """A bugtracker error""" + pass + +class BugNotFoundError(Exception): + """Pity, bug isn't there""" + pass + +cvere = re.compile(r']*>Description.*?]*>\s*(?P.*?)\s*', re.I | re.DOTALL) +cverre = re.compile(r']*>\s*(?P.*?)\s*', re.I | re.DOTALL) +# Define CVE tracker +class CVE: + def get_bug(self, cveid, do_url=True): + url = "https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-%s" % cveid + try: + cvedata = utils.web.getUrl(url).decode('utf-8') + except Exception as e: + raise BugtrackerError('Could not get CVE data: %s (%s)' % (e, url)) + m = cvere.search(cvedata) + if m: + cve = utils.web.htmlToText(m.group('cve'), tagReplace='') + if len(cve) > 380: + cve = cve[:380] + '...' + if do_url: + cve += ' <%s>' % url + return cve + else: + m = cverre.search(cvedata) + if m: + cverr = utils.web.htmlToText(m.group('cverr'), tagReplace='') + if "Couldn't find" in cverr: + raise BugNotFoundError + return cverr + +# Define all bugtrackers +class IBugtracker: + def __init__(self, name=None, url=None, description=None, trackertype=None, aliases=[]): + self.name = name + self.url = url + self.description = description + self.trackertype = trackertype + self.aliases = set(aliases) + self.errget = 'Could not get data from %s: %s (%s)' + self.errparse = 'Could not parse data from %s: %s (%s)' + + def __str__(self): + return self.name + + def __hash__(self): + return hash(self.url) + + def __cmp__(self, other): # used implicitly in Bugtracker.is_ok() + return cmp(hash(self), hash(other)) + +class Bugzilla(IBugtracker): + def get_tracker(self, url): + try: + match = re.match(r'(?P(?P[^\s/]+).*)/show_bug\.cgi', url) + desc = match.group('desc') + name = desc.lower() + url = 'https://%s' % match.group('url') + return Bugzilla(name, url, desc, 'bugzilla') + except: + pass + + def get_bug(self, bugtype, bugid): + url = "%s/rest/bug/%d" % (self.url, bugid) + try: + bugjson = utils.web.getUrl(url) + bug = json.loads(bugjson.decode('utf-8'))['bugs'][0] + except Exception as e: + # For old-stable Bugzilla + if 'HTTP Error 404' in str(e): + return self.get_bug_old(bugtype, bugid) + raise BugtrackerError(self.errget % (self.description, e, url)) + try: + status = bug['status'] + if bug['resolution']: + status += ': %s' % bug['resolution'] + if bug['assigned_to_detail']: + assignee = bug['assigned_to_detail']['real_name'] + if not assignee: + assignee = bug['assigned_to_detail']['name'] + else: + assignee = '' + return (bugid, bug['product'], bug['summary'], bug['severity'], status, assignee, + "%s/show_bug.cgi?id=%d" % (self.url, bugid), [], []) + except Exception as e: + raise BugtrackerError(self.errparse % (self.description, e, url)) + + def get_bug_old(self, bugtype, bugid): # Deprecated + url = "%s/show_bug.cgi?id=%d&ctype=xml" % (self.url, bugid) + try: + bugxml = utils.web.getUrl(url) + zilladom = minidom.parseString(bugxml) + except Exception as e: + raise BugtrackerError(self.errget % (self.description, e, url)) + bug_n = zilladom.getElementsByTagName('bug')[0] + if bug_n.hasAttribute('error'): + errtxt = bug_n.getAttribute('error') + if errtxt in ('NotFound', 'InvalidBugId'): + raise BugNotFoundError + s = 'Could not get %s bug #%d: %s' % (self.description, bugid, errtxt) + raise BugtrackerError(s) + try: + title = _getnodetxt(bug_n.getElementsByTagName('short_desc')[0]) + status = _getnodetxt(bug_n.getElementsByTagName('bug_status')[0]) + try: + status = "%s: %s" % (status, _getnodetxt(bug_n.getElementsByTagName('resolution')[0])) + except: + pass + product = _getnodetxt(bug_n.getElementsByTagName('product')[0]) + severity = _getnodetxt(bug_n.getElementsByTagName('bug_severity')[0]) + try: + assignee = _getnodeattr(bug_n.getElementsByTagName('assigned_to')[0], 'name') + except: + try: + assignee = _getnodetxt(bug_n.getElementsByTagName('assigned_to')[0]) + except: + assignee = '' + except Exception as e: + raise BugtrackerError(self.errparse % (self.description, e, url)) + return (bugid, product, title, severity, status, assignee, "%s/show_bug.cgi?id=%d" % (self.url, bugid), [], []) + +class Launchpad(IBugtracker): + statuses = ("Unknown", "Invalid", "Opinion", "Won't Fix", "Fix Released", "Fix Committed", "New", + "Incomplete", "Confirmed", "Triaged", "In Progress") + severities = ("Unknown", "Undecided", "Wishlist", "Low", "Medium", "High", "Critical") + + def __init__(self, *args, **kwargs): + IBugtracker.__init__(self, *args, **kwargs) + self.lp = None + + # A word to the wise: + # The Launchpad API is much better than the /+text interface we currently use, + # it's faster and easier to get the information we need. + # The current /+text interface is not really maintained by Launchpad and most, + # or all, of the Launchpad developers hate it. For this reason, we are dropping + # support for /+text in the future in favour of launchpadlib. + # Terence Simpson (tsimpson) 2010-04-20 + + try: + from launchpadlib.launchpad import Launchpad + cachedir = os.path.join(conf.supybot.directories.data.tmp(), 'launchpadlib') + self.lp = Launchpad.login_anonymously("Ubuntu Bots - Bugtracker", 'production', cachedir, version='devel') + except ImportError: + supylog.warning("Please install python-launchpadlib, the old interface is deprecated") + except Exception: + self.lp = None + supylog.exception("Unknown exception while accessing the Launchpad API") + + def _parse(self, task): # Deprecated + parser = FeedParser() + parser.feed(task) + return parser.close() + + @classmethod + def _rank(cls, task): + try: + return float('%d.%02d' % (cls.statuses.index(task.status), + cls.severities.index(task.importance))) + except: + return 0 + + @classmethod + def _rank_old(cls, task): + try: + return float('%d.%02d' % (cls.statuses.index(task['status']), + cls.severities.index(task['importance']))) + except: + return 0 + + @classmethod + def _sort(cls, task1, task2): # Deprecated + try: + if task1.status != task2.status: + if cls.statuses.index(task1.status) < cls.statuses.index(task2.status): + return -1 + return 1 + + if task1.importance != task2.importance: + if cls.severities.index(task1.importance) < cls.severities.index(task2.importance): + return -1 + return 1 + except: + return 0 + return 0 + + @classmethod + def _sort_old(cls, task1, task2): # Deprecated + try: + if task1['status'] != task2['status']: + if cls.statuses.index(task1['status']) < cls.statuses.index(task2['status']): + return -1 + return 1 + + if task1['importance'] != task2['importance']: + if cls.severities.index(task1['importance']) < cls.severities.index(task2['importance']): + return -1 + return 1 + except: + return 0 + return 0 + + def get_bug(self, bugtype, bugid): #TODO: Remove this method and rename 'get_bug_new' to 'get_bug' + if self.lp: + return self.get_bug_new(bugtype, bugid) + return self.get_bug_old(bugtype, bugid) + + def get_bug_new(self, bugtype, bugid): #TODO: Rename this method to 'get_bug' + try: + bugdata = self.lp.bugs[bugid] + if bugdata.private: + raise BugtrackerError("This bug is private") + duplicate = [] + dup = bugdata.duplicate_of + while dup: + duplicate.append(str(bugdata.id)) + bugdata = dup + dup = bugdata.duplicate_of + + extinfo = ['affected: %d' % bugdata.users_affected_count_with_dupes] + extinfo.append('heat: %d' % bugdata.heat) + tasks = bugdata.bug_tasks + + if tasks.total_size > 1: + taskdata = sorted(tasks, key=self._rank)[-1] + else: + taskdata = tasks[0] + + if taskdata.assignee: + assignee = taskdata.assignee.display_name + else: + assignee = '' + + except Exception as e: + if type(e).__name__ == 'HTTPError': # messy, but saves trying to import lazr.restfulclient.errors.HTPError + if e.response.status == 404: + bugNo = e.content.split()[-1][2:-1] # extract the real bug number + if bugNo != str(bugid): # A duplicate of a private bug, at least we know it exists + raise BugtrackerError('Bug #%d is a duplicate of bug #%s, but it is private (%s/bugs/%s)' % (bugid, bugNo, self.url, bugNo)) + raise BugtrackerError("Bug #%d is private or does not exist (%s/bugs/%d)" % (bugid, self.url, bugid)) # Could be private, could just not exist + raise BugtrackerError(self.errget % (self.description, e, '%s/bugs/%d' % (self.url, bugid))) + elif isinstance(e, KeyError): + raise BugNotFoundError + raise BugtrackerError(self.errget % (self.description, e, '%s/bugs/%d' % (self.url, bugid))) + + return (bugdata.id, taskdata.bug_target_display_name, bugdata.title, taskdata.importance, taskdata.status, + assignee, "%s/bugs/%d" % (self.url, bugdata.id), extinfo, duplicate) + + def get_bug_old(self, bugtype, bugid, duplicate=None): # Deprecated + try: + bugdata = utils.web.getUrl("%s/bugs/%d/+text" % (self.url, bugid)).decode('utf-8') + except Exception as e: + if 'HTTP Error 404' in str(e): + if duplicate: + raise BugtrackerError('Bug #%d is a duplicate of bug #%d, but it is private (%s/bugs/%d)' % (duplicate, bugid, self.url, bugid)) + else: + raise BugNotFoundError + raise BugtrackerError(self.errget % (self.description, e, '%s/bugs/%d' % (self.url, bugid))) + + try: + # Split bug data into separate pieces (bug data, task data) + data = bugdata.split('\n\nContent-Type:', 1)[0].split('\n\n') + bugdata = self._parse(data[0]) + if not bugdata['duplicate-of']: + taskdata = list(map(self._parse, data[1:])) + if len(taskdata) > 1: + taskdata = sorted(taskdata, key=self._rank_old)[-1] + else: + taskdata = taskdata[0] + if taskdata['assignee']: + assignee = re.sub(r' \([^)]*\)$', '', taskdata['assignee']) + else: + assignee = '' + except Exception as e: + raise BugtrackerError(self.errparse % (self.description, e, '%s/bugs/%d' % (self.url, bugid))) + + # Try and find duplicates + if bugdata['duplicate-of']: + data = self.get_bug_old(bugtype, int(bugdata['duplicate-of']), duplicate or bugid) + data[8].append(bugdata['bug']) + return data + + return (bugid, taskdata['task'], bugdata['title'], taskdata['importance'], taskdata['status'], + assignee, "%s/bugs/%d" % (self.url, bugid), [], []) + +# +# Debbugs sucks donkeyballs +# * HTML pages are inconsistent +# * Parsing mboxes gets incorrect with cloning perversions (eg with bug 330000) +# * No sane way of accessing bug reports in a machine readable way (bts2ldap +# has no search on bugid) +# * The damn thing allow incomplete bugs, eg bugs without severity set. WTF?!? +# +# Fortunately bugs.donarmstrong.com has a SOAP interface which we can use. +# +class Debbugs(IBugtracker): + def __init__(self, *args, **kwargs): + IBugtracker.__init__(self, *args, **kwargs) + self.soap_client = SoapClient("%s/cgi-bin/soap.cgi" % self.url, namespace="Debbugs/SOAP") + + def get_bug(self, bugtype, bugid): + url = "%s/cgi-bin/bugreport.cgi?bug=%d" % (self.url, bugid) + try: + raw = self.soap_client.get_status(bugs=bugid) + except Exception as e: + raise BugtrackerError(self.errget % (self.description, e, url)) + if not hasattr(raw, 'item'): + raise BugNotFoundError + try: + raw = raw.item.value + if str(raw.fixed_versions): + status = 'Fixed' + else: + status = 'Open' + return (bugid, str(raw.package), str(raw.subject), str(raw.severity), status, '', "%s/%d" % (self.url, bugid), [], []) + except Exception as e: + raise BugtrackerError(self.errparse % (self.description, e, url)) + +class SourceForge(IBugtracker): + def get_tracker(self, url): + try: + match = re.match(r'sourceforge\.net/p/[^\s/]+/(bugs|feature-requests|patches|todo)', url) + desc = match.group(0) + name = desc.lower() + url = 'https://%s' % desc + return SourceForge(name, url, desc, 'sourceforge') + except: + pass + + def get_bug(self, bugtype, bugid): + url = "%s/%d/" % (self.url.replace('sourceforge.net', 'sourceforge.net/rest'), bugid) + try: + bugjson = utils.web.getUrl(url) + bug = json.loads(bugjson.decode('utf-8'))['ticket'] + except Exception as e: + raise BugtrackerError(self.errget % (self.description, e, url)) + try: + product = severity = '' + if bug['labels']: + product = bug['labels'][0] + if '_priority' in bug['custom_fields']: + severity = 'Pri: %s' % bug['custom_fields']['_priority'] + return (bugid, product, bug['summary'], severity, ': '.join(bug['status'].split('-')), + bug['assigned_to'], "%s/%d/" % (self.url, bugid), [], []) + except Exception as e: + raise BugtrackerError(self.errparse % (self.description, e, url)) + +class GitHub(IBugtracker): + def get_tracker(self, url): + try: + match = re.match(r'github\.com/[^\s/]+/[^\s/]+/(issues|pulls?)', url) + desc = match.group(0) + url = 'https://%s' % desc + # Pulls are inconsistent in main and single page URLs + desc = re.sub(r'/pull$', r'/pulls', desc) + name = desc.lower() + return GitHub(name, url, desc, 'github') + except: + pass + + def get_bug(self, bugtype, bugid): + url = "%s/%d" % (self.url.replace('github.com', 'api.github.com/repos'), bugid) + # Pulls are inconsistent in web and API URLs + url = url.replace('/pull/', '/pulls/') + if bugtype in ('issue', 'bug'): + url = url.replace('/pulls/', '/issues/') + elif bugtype in ('pull', 'pr', 'merge', 'mr'): + url = url.replace('/issues/', '/pulls/') + try: + bugjson = utils.web.getUrl(url) + bug = json.loads(bugjson.decode('utf-8')) + except Exception as e: + raise BugtrackerError(self.errget % (self.description, e, url)) + try: + product = '/'.join(self.url.split('/')[-3:-1]) + if 'merged' in bug and bug['merged']: + status = 'Merged' + else: + status = bug['state'] + if bug['assignee']: + assignee = bug['assignee']['login'] + else: + assignee = '' + return (bugid, product, bug['title'], '', status, assignee, bug['html_url'], [], []) + except Exception as e: + raise BugtrackerError(self.errparse % (self.description, e, url)) + +class GitLab(IBugtracker): + def get_tracker(self, url, bugid): + try: + match = re.match(r'[^\s/]+/(?P[^\s/]+/[^\s/]+(/[^\s/]+)*?)/(-/)?(issues|merge_requests)', url) + desc = match.group(0) + name = desc.lower() + url = 'https://%s' % desc + bugurl = "%s/%d" % (re.sub(r'(://[^\s/]+)/[^\s/]+(/[^\s/]+)+/(-/)?', + r'\g<1>/api/v4/projects/%s/' % match.group('project').replace('/', '%2F'), url), bugid) + bugjson = utils.web.getUrl(bugurl) + bug = json.loads(bugjson.decode('utf-8')) + return GitLab(name, url, desc, 'gitlab') + except: + pass + + def get_bug(self, bugtype, bugid): + match = re.match(r'[^\s:]+://[^\s/]+/(?P[^\s/]+/[^\s/]+(/[^\s/]+)*?)/(-/)?(issues|merge_requests)', self.url) + url = "%s/%d" % (re.sub(r'(://[^\s/]+)/[^\s/]+(/[^\s/]+)+/(-/)?', + r'\g<1>/api/v4/projects/%s/' % match.group('project').replace('/', '%2F'), self.url), bugid) + if bugtype in ('issue', 'bug'): + url = url.replace('/merge_requests/', '/issues/') + elif bugtype in ('merge', 'mr', 'pull', 'pr'): + url = url.replace('/issues/', '/merge_requests/') + try: + bugjson = utils.web.getUrl(url) + bug = json.loads(bugjson.decode('utf-8')) + except Exception as e: + raise BugtrackerError(self.errget % (self.description, e, url)) + try: + product = match.group('project') + status = bug['state'] + if bug['assignees']: + assino = len(bug['assignees']) + if assino == 1: + assignee = bug['assignees'][0]['name'] + else: + assignee = '%d people' % assino + else: + assignee = '' + return (bugid, product, bug['title'], '', status, assignee, bug['web_url'], [], []) + except Exception as e: + raise BugtrackerError(self.errparse % (self.description, e, url)) + +class Gitea(IBugtracker): + def get_tracker(self, url, bugid): + try: + match = re.match(r'[^\s/]+/[^\s/]+/[^\s/]+/(issues|pulls)', url) + desc = match.group(0) + name = desc.lower() + url = 'https://%s' % desc + bugurl = '%s/%d' % (re.sub(r'(://[^\s/]+)/', r'\g<1>/api/v1/repos/', url), bugid) + bugjson = utils.web.getUrl(bugurl) + bug = json.loads(bugjson.decode('utf-8')) + return Gitea(name, url, desc, 'gitea') + except: + pass + + def get_bug(self, bugtype, bugid): + url = "%s/%d" % (re.sub(r'(://[^\s/]+)/', r'\g<1>/api/v1/repos/', self.url), bugid) + if bugtype in ('issue', 'bug'): + url = url.replace('/pulls/', '/issues/') + elif bugtype in ('pull', 'pr', 'merge', 'mr'): + url = url.replace('/issues/', '/pulls/') + try: + bugjson = utils.web.getUrl(url) + bug = json.loads(bugjson.decode('utf-8')) + except Exception as e: + raise BugtrackerError(self.errget % (self.description, e, url)) + try: + product = '/'.join(self.url.split('/')[-3:-1]) + if 'merged' in bug and bug['merged']: + status = 'Merged' + else: + status = bug['state'] + if bug['assignee']: + assignee = bug['assignee']['username'] + else: + assignee = '' + # Issues have no 'html_url', but pulls do + if 'html_url' in bug: + htmlurl = bug['html_url'] + else: + htmlurl = url.replace('/api/v1/repos/', '/') + return (bugid, product, bug['title'], '', status, assignee, htmlurl, [], []) + except Exception as e: + raise BugtrackerError(self.errparse % (self.description, e, url)) + +class Mantis(IBugtracker): + def __init__(self, *args, **kwargs): + IBugtracker.__init__(self, *args, **kwargs) + self.soap_client = SoapClient("%s/api/soap/mantisconnect.php" % self.url, namespace="http://futureware.biz/mantisconnect") + + def get_tracker(self, url): + try: + match = re.match(r'(?P(?P[^\s/]+).*)/view\.php', url) + desc = match.group('desc') + name = desc.lower() + url = 'https://%s' % match.group('url') + return Mantis(name, url, desc, 'mantis') + except: + pass + + def get_bug(self, bugtype, bugid): + url = "%s/api/rest/issues/%d" % (self.url, bugid) + try: + bugjson = utils.web.getUrl(url) + bug = json.loads(bugjson.decode('utf-8'))['issues'][0] + except Exception as e: + # REST API may not be enabled yet + if 'HTTP Error 404' in str(e): + return self.get_bug_old(bugtype, bugid) + raise BugtrackerError(self.errget % (self.description, e, url)) + try: + return (bugid, bug['project']['name'], bug['summary'], bug['severity']['name'], bug['resolution']['name'], '', url, [], []) + except Exception as e: + raise BugtrackerError(self.errparse % (self.description, e, url)) + + def get_bug_old(self, bugtype, bugid): # Deprecated + url = "%s/view.php?id=%d" % (self.url, bugid) + try: + raw = self.soap_client.mc_issue_get(username='', password='', issue_id=bugid) + except Exception as e: + if 'Issue #%d not found' % bugid in str(e): + raise BugNotFoundError + # Often SOAP is not enabled + if '.' in self.name: + supylog.exception(self.errget % (self.description, e, url)) + return + raise BugtrackerError(self.errget % (self.description, e, url)) + if not hasattr(raw, 'id'): + raise BugNotFoundError + try: + return (bugid, str(raw.project.name), str(raw.summary), str(raw.severity.name), str(raw.resolution.name), '', url, [], []) + except Exception as e: + raise BugtrackerError(self.errparse % (self.description, e, url)) + +# For Trac-based trackers we get the tab-separated-values format. +# The other option is a comma-separated-values format, but if the description +# has commas, things get tricky. +# This should be more robust than the screen scraping done previously. +class Trac(IBugtracker): + def get_tracker(self, url): + try: + match = re.match(r'(?P[^\s/]+).*/ticket', url) + desc = match.group('desc') + name = desc.lower() + url = 'https://%s' % match.group(0) + return Trac(name, url, desc, 'trac') + except: + pass + + def get_bug(self, bugtype, bugid): # This is still a little rough, but it works :) + url = "%s/%d" % (self.url, bugid) + try: + raw = utils.web.getUrl("%s?format=tab" % url).decode('utf-8') + except Exception as e: + # Due to unreliable matching + if '.' in self.name: + supylog.exception(self.errget % (self.description, e, url)) + return + if 'HTTP Error 500' in str(e): + raise BugNotFoundError + raise BugtrackerError(self.errget % (self.description, e, url)) + try: + raw = raw.replace('\r\n', '\n') + (headers, rest) = raw.split('\n', 1) + headers = headers.strip().split('\t') + rest = rest.strip().split('\t') + + title = rest[headers.index("summary")] + status = rest[headers.index("status")] + package = rest[headers.index("component")] + severity = assignee = "" + if "severity" in headers: + severity = rest[headers.index("severity")] + elif "priority" in headers: + severity = rest[headers.index("priority")] + if "owner" in headers: + assignee = rest[headers.index("owner")] + return (bugid, package, title, severity, status, assignee, url, [], []) + except Exception as e: + # Due to unreliable matching + if '.' in self.name: + supylog.exception(self.errparse % (self.description, e, url)) + return + raise BugtrackerError(self.errparse % (self.description, e, url)) + +# Introspection is quite cool +defined_bugtrackers = {} +v = vars() +for k in list(v.keys()): + if type(v[k]) == type(IBugtracker) and issubclass(v[k], IBugtracker) and not (v[k] == IBugtracker): + defined_bugtrackers[k.lower()] = v[k]