diff options
-rw-r--r-- | contrib/README | 3 | ||||
-rwxr-xr-x | contrib/gnatsparse/README | 44 | ||||
-rwxr-xr-x | contrib/gnatsparse/gnatsparse.py | 804 | ||||
-rwxr-xr-x | contrib/gnatsparse/magic.py | 712 | ||||
-rwxr-xr-x | contrib/gnatsparse/specialuu.py | 104 |
5 files changed, 1667 insertions, 0 deletions
diff --git a/contrib/README b/contrib/README index a34a43ef6..013cf421d 100644 --- a/contrib/README +++ b/contrib/README @@ -12,6 +12,9 @@ This directory includes: mysqld-watcher.pl -- This script can be installed as a frequent cron job to clean up stalled/dead queries. + gnatsparse/ -- A Python script used to import a GNATS database + into Bugzilla. + gnats2bz.pl -- A perl script to help import bugs from a GNATS database into a Bugzilla database. Contributed by Tom Schutter <tom@platte.com> diff --git a/contrib/gnatsparse/README b/contrib/gnatsparse/README new file mode 100755 index 000000000..737bd18eb --- /dev/null +++ b/contrib/gnatsparse/README @@ -0,0 +1,44 @@ +gnatsparse +========== + +Author: Daniel Berlin <dan@dberlin.org> + +gnatsparse is a simple Python program that imports a GNATS database +into a Bugzilla system. It is based on the gnats2bz.pl Perl script +but it's a rewrite at the same time. Its parser is based on gnatsweb, +which gives a 10 times speed improvement compared to the previous code. + +Features +-------- + +* Chunks audit trail into separate comments, with the right From's, times, etc. + +* Handles followup emails that are in the report, with the right From's, times, +etc. + +* Properly handles duplicates, adding the standard bugzilla duplicate message. + +* Extracts and handles gnatsweb attachments, as well as uuencoded attachments +appearing in either followup emails, the how-to-repeat field, etc. Replaces +them with a message to look at the attachments list, and adds the standard +"Created an attachment" message that bugzilla uses. Handling them includes +giving them the right name and mime-type. "attachments" means multiple +uuencoded things/gnatsweb attachments are handled properly. + +* Handles reopened bug reports. + +* Builds the cc list from the people who have commented on the report, +and the reporter. + +Requirements +------------ + +It requires python 2.2+, it won't work with 1.5.2 (Linux distributions +ship with 2.2+ these days, so that shouldn't be an issue). + +Documentation +------------- + +Documentation can be found inside the scripts. The source code is self +documenting. + diff --git a/contrib/gnatsparse/gnatsparse.py b/contrib/gnatsparse/gnatsparse.py new file mode 100755 index 000000000..5f7cde713 --- /dev/null +++ b/contrib/gnatsparse/gnatsparse.py @@ -0,0 +1,804 @@ +try: +# Using Psyco makes it about 25% faster, but there's a bug in psyco in +# handling of eval causing it to use unlimited memory with the magic +# file enabled. +# import psyco +# psyco.full() +# from psyco.classes import * + pass +except: + pass +import re +import base64 +import cStringIO +import specialuu +import array +import email.Utils +import zlib +import magic + +# Comment out if you don't want magic detection +magicf = magic.MagicFile() + +# Open our output file +outfile = open("gnats2bz_data.sql", "w") + +# List of GNATS fields +fieldnames = ("Number", "Category", "Synopsis", "Confidential", "Severity", + "Priority", "Responsible", "State", "Quarter", "Keywords", + "Date-Required", "Class", "Submitter-Id", "Arrival-Date", + "Closed-Date", "Last-Modified", "Originator", "Release", + "Organization", "Environment", "Description", "How-To-Repeat", + "Fix", "Release-Note", "Audit-Trail", "Unformatted") + +# Dictionary telling us which GNATS fields are multiline +multilinefields = {"Organization":1, "Environment":1, "Description":1, + "How-To-Repeat":1, "Fix":1, "Release-Note":1, + "Audit-Trail":1, "Unformatted":1} + +# Mapping of GCC release to version. Our version string is updated every +# so we need to funnel all release's with 3.4 in the string to be version +# 3.4 for bug tracking purposes +# The key is a regex to match, the value is the version it corresponds +# with +releasetovermap = {r"3\.4":"3.4", r"3\.3":"3.3", r"3\.2\.2":"3.2.2", + r"3\.2\.1":"3.2.1", r"3\.2":"3.2", r"3\.1\.2":"3.1.2", + r"3\.1\.1":"3.1.1", r"3\.1":"3.1", r"3\.0\.4":"3.0.4", + r"3\.0\.3":"3.0.3", r"3\.0\.2":"3.0.2", r"3\.0\.1":"3.0.1", + r"3\.0":"3.0", r"2\.95\.4":"2.95.4", r"2\.95\.3":"2.95.3", + r"2\.95\.2":"2.95.2", r"2\.95\.1":"2.95.1", + r"2\.95":"2.95", r"2\.97":"2.97", + r"2\.96.*[rR][eE][dD].*[hH][aA][tT]":"2.96 (redhat)", + r"2\.96":"2.96"} + +# These map the field name to the field id bugzilla assigns. We need +# the id when doing bug activity. +fieldids = {"State":8, "Responsible":15} + +# These are the keywords we use in gcc bug tracking. They are transformed +# into bugzilla keywords. The format here is <keyword>-><bugzilla keyword id> +keywordids = {"wrong-code":1, "ice-on-legal-code":2, "ice-on-illegal-code":3, + "rejects-legal":4, "accepts-illegal":5, "pessimizes-code":6} + +# Map from GNATS states to Bugzilla states. Duplicates and reopened bugs +# are handled when parsing the audit trail, so no need for them here. +state_lookup = {"":"NEW", "open":"ASSIGNED", "analyzed":"ASSIGNED", + "feedback":"WAITING", "closed":"CLOSED", + "suspended":"SUSPENDED"} + +# Table of versions that exist in the bugs, built up as we go along +versions_table = {} + +# Delimiter gnatsweb uses for attachments +attachment_delimiter = "----gnatsweb-attachment----\n" + +# Here starts the various regular expressions we use +# Matches an entire GNATS single line field +gnatfieldre = re.compile(r"""^([>\w\-]+)\s*:\s*(.*)\s*$""") + +# Matches the name of a GNATS field +fieldnamere = re.compile(r"""^>(.*)$""") + +# Matches the useless part of an envelope +uselessre = re.compile(r"""^(\S*?):\s*""", re.MULTILINE) + +# Matches the filename in a content disposition +dispositionre = re.compile("(\\S+);\\s*filename=\"([^\"]+)\"") + +# Matches the last changed date in the entire text of a bug +# If you have other editable fields that get audit trail entries, modify this +# The field names are explicitly listed in order to speed up matching +lastdatere = re.compile(r"""^(?:(?:State|Responsible|Priority|Severity)-Changed-When: )(.+?)$""", re.MULTILINE) + +# Matches the From line of an email or the first line of an audit trail entry +# We use this re to find the begin lines of all the audit trail entries +# The field names are explicitly listed in order to speed up matching +fromtore=re.compile(r"""^(?:(?:State|Responsible|Priority|Severity)-Changed-From-To: |From: )""", re.MULTILINE) + +# These re's match the various parts of an audit trail entry +changedfromtore=re.compile(r"""^(\w+?)-Changed-From-To: (.+?)$""", re.MULTILINE) +changedbyre=re.compile(r"""^\w+?-Changed-By: (.+?)$""", re.MULTILINE) +changedwhenre=re.compile(r"""^\w+?-Changed-When: (.+?)$""", re.MULTILINE) +changedwhyre=re.compile(r"""^\w+?-Changed-Why:\s*(.*?)$""", re.MULTILINE) + +# This re matches audit trail text saying that the current bug is a duplicate of another +duplicatere=re.compile(r"""(?:")?Dup(?:licate)?(?:d)?(?:")? of .*?(\d+)""", re.IGNORECASE | re.MULTILINE) + +# Get the text of a From: line +fromre=re.compile(r"""^From: (.*?)$""", re.MULTILINE) + +# Get the text of a Date: Line +datere=re.compile(r"""^Date: (.*?)$""", re.MULTILINE) + +# Map of the responsible file to email addresses +responsible_map = {} +# List of records in the responsible file +responsible_list = [] +# List of records in the categories file +categories_list = [] +# List of pr's in the index +pr_list = [] +# Map usernames to user ids +usermapping = {} +# Start with this user id +userid_base = 2 + +# Name of gnats user +gnats_username = "gnats@gcc.gnu.org" +# Name of unassigned user +unassigned_username = "unassigned@gcc.gnu.org" + +gnats_db_dir = "." +product = "gcc" +productdesc = "GNU Compiler Connection" +milestoneurl = "http://gcc/gnu.org" +defaultmilestone = "3.4" + +def write_non_bug_tables(): + """ Write out the non-bug related tables, such as products, profiles, etc.""" + # Set all non-unconfirmed bugs's everconfirmed flag + print >>outfile, "update bugs set everconfirmed=1 where bug_status != 'UNCONFIRMED';" + + # Set all bugs assigned to the unassigned user to NEW + print >>outfile, "update bugs set bug_status='NEW',assigned_to='NULL' where bug_status='ASSIGNED' AND assigned_to=3;" + + # Insert the products + print >>outfile, "\ninsert into products (" + print >>outfile, " product, description, milestoneurl, disallownew," + print >>outfile, " defaultmilestone, votestoconfirm) values (" + print >>outfile, " '%s', '%s', '%s', 0, '%s', 1);" % (product, + productdesc, + milestoneurl, + defaultmilestone) + + # Insert the components + for category in categories_list: + component = SqlQuote(category[0]) + productstr = SqlQuote(product) + description = SqlQuote(category[1]) + initialowner = SqlQuote("3") + print >>outfile, "\ninsert into components ("; + print >>outfile, " value, program, initialowner, initialqacontact," + print >>outfile, " description) values (" + print >>outfile, " %s, %s, %s, '', %s);" % (component, productstr, + initialowner, description) + + # Insert the versions + for productstr, version_list in versions_table.items(): + productstr = SqlQuote(productstr) + for version in version_list: + version = SqlQuote(version) + print >>outfile, "\ninsert into versions (value, program) " + print >>outfile, " values (%s, %s);" % (version, productstr) + + # Insert the users + for username, userid in usermapping.items(): + realname = map_username_to_realname(username) + username = SqlQuote(username) + realname = SqlQuote(realname) + print >>outfile, "\ninsert into profiles (" + print >>outfile, " userid, login_name, password, cryptpassword, realname, groupset" + print >>outfile, ") values (" + print >>outfile, "%s,%s,'password',encrypt('password'), %s, 0);" % (userid, username, realname) + print >>outfile, "update profiles set groupset=1 << 32 where login_name like '%\@gcc.gnu.org';" + +def unixdate2datetime(unixdate): + """ Convert a unix date to a datetime value """ + year, month, day, hour, min, sec, x, x, x, x = email.Utils.parsedate_tz(unixdate) + return "%d-%02d-%02d %02d:%02d:%02d" % (year,month,day,hour,min,sec) + +def unixdate2timestamp(unixdate): + """ Convert a unix date to a timestamp value """ + year, month, day, hour, min, sec, x, x, x, x = email.Utils.parsedate_tz(unixdate) + return "%d%02d%02d%02d%02d%02d" % (year,month,day,hour,min,sec) + +def SqlQuote(str): + """ Perform SQL quoting on a string """ + return "'%s'" % str.replace("'", """''""").replace("\\", "\\\\").replace("\0","\\0") + +def convert_gccver_to_ver(gccver): + """ Given a gcc version, convert it to a Bugzilla version. """ + for k in releasetovermap.keys(): + if re.search(".*%s.*" % k, gccver) is not None: + return releasetovermap[k] + result = re.search(r""".*(\d\.\d) \d+ \(experimental\).*""", gccver) + if result is not None: + return result.group(1) + return "unknown" + +def load_index(fname): + """ Load in the GNATS index file """ + global pr_list + ifp = open(fname) + for record in ifp.xreadlines(): + fields = record.split("|") + pr_list.append(fields[0]) + ifp.close() + +def load_categories(fname): + """ Load in the GNATS categories file """ + global categories_list + cfp = open(fname) + for record in cfp.xreadlines(): + if re.search("^#", record) is not None: + continue + categories_list.append(record.split(":")) + cfp.close() + +def map_username_to_realname(username): + """ Given a username, find the real name """ + name = username + name = re.sub("@.*", "", name) + for responsible_record in responsible_list: + if responsible_record[0] == name: + return responsible_record[1] + if len(responsible_record) > 2: + if responsible_record[2] == username: + return responsible_record[1] + return "" + + +def get_userid(responsible): + """ Given an email address, get the user id """ + global responsible_map + global usermapping + global userid_base + if responsible is None: + return -1 + responsible = responsible.lower() + responsible = re.sub("sources.redhat.com", "gcc.gnu.org", responsible) + if responsible_map.has_key(responsible): + responsible = responsible_map[responsible] + if usermapping.has_key(responsible): + return usermapping[responsible] + else: + usermapping[responsible] = userid_base + userid_base += 1 + return usermapping[responsible] + +def load_responsible(fname): + """ Load in the GNATS responsible file """ + global responsible_map + global responsible_list + rfp = open(fname) + for record in rfp.xreadlines(): + if re.search("^#", record) is not None: + continue + split_record = record.split(":") + responsible_map[split_record[0]] = split_record[2].rstrip() + responsible_list.append(record.split(":")) + rfp.close() + +def split_csl(list): + """ Split a comma seperated list """ + newlist = re.split(r"""\s*,\s*""", list) + return newlist + +def fix_email_addrs(addrs): + """ Perform various fixups and cleaning on an e-mail address """ + addrs = split_csl(addrs) + trimmed_addrs = [] + for addr in addrs: + addr = re.sub(r"""\(.*\)""","",addr) + addr = re.sub(r""".*<(.*)>.*""","\\1",addr) + addr = addr.rstrip() + addr = addr.lstrip() + trimmed_addrs.append(addr) + addrs = ", ".join(trimmed_addrs) + return addrs + +class Bugzillabug(object): + """ Class representing a bugzilla bug """ + def __init__(self, gbug): + """ Initialize a bugzilla bug from a GNATS bug. """ + self.bug_id = gbug.bug_id + self.long_descs = [] + self.bug_ccs = [get_userid("gcc-bugs@gcc.gnu.org")] + self.bug_activity = [] + self.attachments = gbug.attachments + self.gnatsfields = gbug.fields + self.need_unformatted = gbug.has_unformatted_attach == 0 + self.need_unformatted &= gbug.fields.has_key("Unformatted") + self.translate_pr() + self.update_versions() + if self.fields.has_key("Audit-Trail"): + self.parse_audit_trail() + self.write_bug() + + def parse_fromto(type, string): + """ Parses the from and to parts of a changed-from-to line """ + fromstr = "" + tostr = "" + + # Some slightly messed up changed lines have unassigned-new, + # instead of unassigned->new. So we make the > optional. + result = re.search(r"""(.*)-(?:>?)(.*)""", string) + + # Only know how to handle parsing of State and Responsible + # changed-from-to right now + if type == "State": + fromstr = state_lookup[result.group(1)] + tostr = state_lookup[result.group(2)] + elif type == "Responsible": + if result.group(1) != "": + fromstr = result.group(1) + if result.group(2) != "": + tostr = result.group(2) + if responsible_map.has_key(fromstr): + fromstr = responsible_map[fromstr] + if responsible_map.has_key(tostr): + tostr = responsible_map[tostr] + return (fromstr, tostr) + parse_fromto = staticmethod(parse_fromto) + + def parse_audit_trail(self): + """ Parse a GNATS audit trail """ + trail = self.fields["Audit-Trail"] + # Begin to split the audit trail into pieces + result = fromtore.finditer(trail) + starts = [] + ends = [] + pieces = [] + # Make a list of the pieces + for x in result: + pieces.append (x) + # Find the start and end of each piece + if len(pieces) > 0: + for x in xrange(len(pieces)-1): + starts.append(pieces[x].start()) + ends.append(pieces[x+1].start()) + starts.append(pieces[-1].start()) + ends.append(len(trail)) + pieces = [] + # Now make the list of actual text of the pieces + for x in xrange(len(starts)): + pieces.append(trail[starts[x]:ends[x]]) + # And parse the actual pieces + for piece in pieces: + result = changedfromtore.search(piece) + # See what things we actually have inside this entry, and + # handle them approriately + if result is not None: + type = result.group(1) + changedfromto = result.group(2) + # If the bug was reopened, mark it as such + if changedfromto.find("closed->analyzed") != -1: + if self.fields["bug_status"] == "'NEW'": + self.fields["bug_status"] = "'REOPENED'" + if type == "State" or type == "Responsible": + oldstate, newstate = self.parse_fromto (type, changedfromto) + result = changedbyre.search(piece) + if result is not None: + changedby = result.group(1) + result = changedwhenre.search(piece) + if result is not None: + changedwhen = result.group(1) + changedwhen = unixdate2datetime(changedwhen) + changedwhen = SqlQuote(changedwhen) + result = changedwhyre.search(piece) + changedwhy = piece[result.start(1):] + #changedwhy = changedwhy.lstrip() + changedwhy = changedwhy.rstrip() + changedby = get_userid(changedby) + # Put us on the cc list if we aren't there already + if changedby != self.fields["userid"] \ + and changedby not in self.bug_ccs: + self.bug_ccs.append(changedby) + # If it's a duplicate, mark it as such + result = duplicatere.search(changedwhy) + if result is not None: + newtext = "*** This bug has been marked as a duplicate of %s ***" % result.group(1) + newtext = SqlQuote(newtext) + self.long_descs.append((self.bug_id, changedby, + changedwhen, newtext)) + self.fields["bug_status"] = "'RESOLVED'" + self.fields["resolution"] = "'DUPLICATE'" + self.fields["userid"] = changedby + else: + newtext = "%s-Changed-From-To: %s\n%s-Changed-Why: %s\n" % (type, changedfromto, type, changedwhy) + newtext = SqlQuote(newtext) + self.long_descs.append((self.bug_id, changedby, + changedwhen, newtext)) + if type == "State" or type == "Responsible": + newstate = SqlQuote("%s" % newstate) + oldstate = SqlQuote("%s" % oldstate) + fieldid = fieldids[type] + self.bug_activity.append((newstate, oldstate, fieldid, changedby, changedwhen)) + + else: + # It's an email + result = fromre.search(piece) + if result is None: + continue + fromstr = result.group(1) + fromstr = fix_email_addrs(fromstr) + fromstr = get_userid(fromstr) + result = datere.search(piece) + if result is None: + continue + datestr = result.group(1) + datestr = SqlQuote(unixdate2timestamp(datestr)) + if fromstr != self.fields["userid"] \ + and fromstr not in self.bug_ccs: + self.bug_ccs.append(fromstr) + self.long_descs.append((self.bug_id, fromstr, datestr, + SqlQuote(piece))) + + + + def write_bug(self): + """ Output a bug to the data file """ + fields = self.fields + print >>outfile, "\ninsert into bugs(" + print >>outfile, " bug_id, assigned_to, bug_severity, priority, bug_status, creation_ts, delta_ts," + print >>outfile, " short_desc," + print >>outfile, " reporter, version," + print >>outfile, " product, component, resolution, target_milestone, qa_contact," + print >>outfile, " gccbuild, gcctarget, gcchost, keywords" + print >>outfile, " ) values (" + print >>outfile, "%s, %s, %s, %s, %s, %s, %s," % (self.bug_id, fields["userid"], fields["bug_severity"], fields["priority"], fields["bug_status"], fields["creation_ts"], fields["delta_ts"]) + print >>outfile, "%s," % (fields["short_desc"]) + print >>outfile, "%s, %s," % (fields["reporter"], fields["version"]) + print >>outfile, "%s, %s, %s, %s, 0," %(fields["product"], fields["component"], fields["resolution"], fields["target_milestone"]) + print >>outfile, "%s, %s, %s, %s" % (fields["gccbuild"], fields["gcctarget"], fields["gcchost"], fields["keywords"]) + print >>outfile, ");" + if self.fields["keywords"] != 0: + print >>outfile, "\ninsert into keywords (bug_id, keywordid) values (" + print >>outfile, " %s, %s);" % (self.bug_id, fields["keywordid"]) + for id, who, when, text in self.long_descs: + print >>outfile, "\ninsert into longdescs (" + print >>outfile, " bug_id, who, bug_when, thetext) values(" + print >>outfile, " %s, %s, %s, %s);" % (id, who, when, text) + for name, data, who in self.attachments: + print >>outfile, "\ninsert into attachments (" + print >>outfile, " bug_id, filename, description, mimetype, ispatch, submitter_id, thedata) values (" + ftype = None + # It's *magic*! + if name.endswith(".ii") == 1: + ftype = "text/x-c++" + elif name.endswith(".i") == 1: + ftype = "text/x-c" + else: + ftype = magicf.detect(cStringIO.StringIO(data)) + if ftype is None: + ftype = "application/octet-stream" + + print >>outfile, "%s,%s,%s, %s,0, %s,%s);" %(self.bug_id, SqlQuote(name), SqlQuote(name), SqlQuote (ftype), who, SqlQuote(zlib.compress(data))) + for newstate, oldstate, fieldid, changedby, changedwhen in self.bug_activity: + print >>outfile, "\ninsert into bugs_activity (" + print >>outfile, " bug_id, who, bug_when, fieldid, added, removed) values (" + print >>outfile, " %s, %s, %s, %s, %s, %s);" % (self.bug_id, + changedby, + changedwhen, + fieldid, + newstate, + oldstate) + for cc in self.bug_ccs: + print >>outfile, "\ninsert into cc(bug_id, who) values (%s, %s);" %(self.bug_id, cc) + def update_versions(self): + """ Update the versions table to account for the version on this bug """ + global versions_table + if self.fields.has_key("Release") == 0 \ + or self.fields.has_key("Category") == 0: + return + curr_product = "gcc" + curr_version = self.fields["Release"] + if curr_version == "": + return + curr_version = convert_gccver_to_ver (curr_version) + if versions_table.has_key(curr_product) == 0: + versions_table[curr_product] = [] + for version in versions_table[curr_product]: + if version == curr_version: + return + versions_table[curr_product].append(curr_version) + def translate_pr(self): + """ Transform a GNATS PR into a Bugzilla bug """ + self.fields = self.gnatsfields + if (self.fields.has_key("Organization") == 0) \ + or self.fields["Organization"].find("GCC"): + self.fields["Originator"] = "" + self.fields["Organization"] = "" + self.fields["Organization"].lstrip() + if (self.fields.has_key("Release") == 0) \ + or self.fields["Release"] == "" \ + or self.fields["Release"].find("unknown-1.0") != -1: + self.fields["Release"]="unknown" + if self.fields.has_key("Responsible"): + result = re.search(r"""\w+""", self.fields["Responsible"]) + self.fields["Responsible"] = "%s%s" % (result.group(0), "@gcc.gnu.org") + self.fields["gcchost"] = "" + self.fields["gcctarget"] = "" + self.fields["gccbuild"] = "" + if self.fields.has_key("Environment"): + result = re.search("^host: (.+?)$", self.fields["Environment"], + re.MULTILINE) + if result is not None: + self.fields["gcchost"] = result.group(1) + result = re.search("^target: (.+?)$", self.fields["Environment"], + re.MULTILINE) + if result is not None: + self.fields["gcctarget"] = result.group(1) + result = re.search("^build: (.+?)$", self.fields["Environment"], + re.MULTILINE) + if result is not None: + self.fields["gccbuild"] = result.group(1) + self.fields["userid"] = get_userid(self.fields["Responsible"]) + self.fields["bug_severity"] = "normal" + if self.fields["Class"] == "change-request": + self.fields["bug_severity"] = "enhancement" + elif self.fields.has_key("Severity"): + if self.fields["Severity"] == "critical": + self.fields["bug_severity"] = "critical" + elif self.fields["Severity"] == "serious": + self.fields["bug_severity"] = "major" + elif self.fields.has_key("Synopsis"): + if re.search("crash|assert", self.fields["Synopsis"]): + self.fields["bug_severity"] = "critical" + elif re.search("wrong|error", self.fields["Synopsis"]): + self.fields["bug_severity"] = "major" + self.fields["bug_severity"] = SqlQuote(self.fields["bug_severity"]) + self.fields["keywords"] = 0 + if keywordids.has_key(self.fields["Class"]): + self.fields["keywords"] = self.fields["Class"] + self.fields["keywordid"] = keywordids[self.fields["Class"]] + self.fields["keywords"] = SqlQuote(self.fields["keywords"]) + self.fields["priority"] = "P1" + if self.fields.has_key("Severity") and self.fields.has_key("Priority"): + severity = self.fields["Severity"] + priority = self.fields["Priority"] + if severity == "critical": + if priority == "high": + self.fields["priority"] = "P1" + else: + self.fields["priority"] = "P2" + elif severity == "serious": + if priority == "low": + self.fields["priority"] = "P4" + else: + self.fields["priority"] = "P3" + else: + if priority == "high": + self.fields["priority"] = "P4" + else: + self.fields["priority"] = "P5" + self.fields["priority"] = SqlQuote(self.fields["priority"]) + state = self.fields["State"] + if (state == "open" or state == "analyzed") and self.fields["userid"] != 3: + self.fields["bug_status"] = "ASSIGNED" + self.fields["resolution"] = "" + elif state == "feedback": + self.fields["bug_status"] = "WAITING" + self.fields["resolution"] = "" + elif state == "closed": + self.fields["bug_status"] = "CLOSED" + if self.fields.has_key("Class"): + theclass = self.fields["Class"] + if theclass.find("duplicate") != -1: + self.fields["resolution"]="DUPLICATE" + elif theclass.find("mistaken") != -1: + self.fields["resolution"]="INVALID" + else: + self.fields["resolution"]="FIXED" + else: + self.fields["resolution"]="FIXED" + elif state == "suspended": + self.fields["bug_status"] = "SUSPENDED" + self.fields["resolution"] = "" + elif state == "analyzed" and self.fields["userid"] == 3: + self.fields["bug_status"] = "NEW" + self.fields["resolution"] = "" + else: + self.fields["bug_status"] = "UNCONFIRMED" + self.fields["resolution"] = "" + self.fields["bug_status"] = SqlQuote(self.fields["bug_status"]) + self.fields["resolution"] = SqlQuote(self.fields["resolution"]) + self.fields["creation_ts"] = "" + if self.fields.has_key("Arrival-Date") and self.fields["Arrival-Date"] != "": + self.fields["creation_ts"] = unixdate2datetime(self.fields["Arrival-Date"]) + self.fields["creation_ts"] = SqlQuote(self.fields["creation_ts"]) + self.fields["delta_ts"] = "" + if self.fields.has_key("Audit-Trail"): + result = lastdatere.findall(self.fields["Audit-Trail"]) + result.reverse() + if len(result) > 0: + self.fields["delta_ts"] = unixdate2timestamp(result[0]) + if self.fields["delta_ts"] == "": + if self.fields.has_key("Arrival-Date") and self.fields["Arrival-Date"] != "": + self.fields["delta_ts"] = unixdate2timestamp(self.fields["Arrival-Date"]) + self.fields["delta_ts"] = SqlQuote(self.fields["delta_ts"]) + self.fields["short_desc"] = SqlQuote(self.fields["Synopsis"]) + if self.fields.has_key("Reply-To") and self.fields["Reply-To"] != "": + self.fields["reporter"] = get_userid(self.fields["Reply-To"]) + elif self.fields.has_key("Mail-Header"): + result = re.search(r"""From .*?([\w.]+@[\w.]+)""", self.fields["Mail-Header"]) + if result: + self.fields["reporter"] = get_userid(result.group(1)) + else: + self.fields["reporter"] = get_userid(gnats_username) + else: + self.fields["reporter"] = get_userid(gnats_username) + long_desc = self.fields["Description"] + long_desc2 = "" + for field in ["Release", "Environment", "How-To-Repeat"]: + if self.fields.has_key(field) and self.fields[field] != "": + long_desc += ("\n\n%s:\n" % field) + self.fields[field] + if self.fields.has_key("Fix") and self.fields["Fix"] != "": + long_desc2 = "Fix:\n" + self.fields["Fix"] + if self.need_unformatted == 1 and self.fields["Unformatted"] != "": + long_desc += "\n\nUnformatted:\n" + self.fields["Unformatted"] + if long_desc != "": + self.long_descs.append((self.bug_id, self.fields["reporter"], + self.fields["creation_ts"], + SqlQuote(long_desc))) + if long_desc2 != "": + self.long_descs.append((self.bug_id, self.fields["reporter"], + self.fields["creation_ts"], + SqlQuote(long_desc2))) + for field in ["gcchost", "gccbuild", "gcctarget"]: + self.fields[field] = SqlQuote(self.fields[field]) + self.fields["version"] = "" + if self.fields["Release"] != "": + self.fields["version"] = convert_gccver_to_ver (self.fields["Release"]) + self.fields["version"] = SqlQuote(self.fields["version"]) + self.fields["product"] = SqlQuote("gcc") + self.fields["component"] = "invalid" + if self.fields.has_key("Category"): + self.fields["component"] = self.fields["Category"] + self.fields["component"] = SqlQuote(self.fields["component"]) + self.fields["target_milestone"] = "---" + if self.fields["version"].find("3.4") != -1: + self.fields["target_milestone"] = "3.4" + self.fields["target_milestone"] = SqlQuote(self.fields["target_milestone"]) + if self.fields["userid"] == 2: + self.fields["userid"] = "\'NULL\'" + +class GNATSbug(object): + """ Represents a single GNATS PR """ + def __init__(self, filename): + self.attachments = [] + self.has_unformatted_attach = 0 + fp = open (filename) + self.fields = self.parse_pr(fp.xreadlines()) + self.bug_id = int(self.fields["Number"]) + if self.fields.has_key("Unformatted"): + self.find_gnatsweb_attachments() + if self.fields.has_key("How-To-Repeat"): + self.find_regular_attachments("How-To-Repeat") + if self.fields.has_key("Fix"): + self.find_regular_attachments("Fix") + + def get_attacher(fields): + if fields.has_key("Reply-To") and fields["Reply-To"] != "": + return get_userid(fields["Reply-To"]) + else: + result = None + if fields.has_key("Mail-Header"): + result = re.search(r"""From .*?([\w.]+\@[\w.]+)""", + fields["Mail-Header"]) + if result is not None: + reporter = get_userid(result.group(1)) + else: + reporter = get_userid(gnats_username) + get_attacher = staticmethod(get_attacher) + def find_regular_attachments(self, which): + fields = self.fields + while re.search("^begin [0-7]{3}", fields[which], + re.DOTALL | re.MULTILINE): + outfp = cStringIO.StringIO() + infp = cStringIO.StringIO(fields[which]) + filename, start, end = specialuu.decode(infp, outfp, quiet=0) + fields[which]=fields[which].replace(fields[which][start:end], + "See attachments for %s\n" % filename) + self.attachments.append((filename, outfp.getvalue(), + self.get_attacher(fields))) + + def decode_gnatsweb_attachment(self, attachment): + result = re.split(r"""\n\n""", attachment, 1) + if len(result) == 1: + return -1 + envelope, body = result + envelope = uselessre.split(envelope) + envelope.pop(0) + # Turn the list of key, value into a dict of key => value + attachinfo = dict([(envelope[i], envelope[i+1]) for i in xrange(0,len(envelope),2)]) + for x in attachinfo.keys(): + attachinfo[x] = attachinfo[x].rstrip() + if (attachinfo.has_key("Content-Type") == 0) or \ + (attachinfo.has_key("Content-Disposition") == 0): + raise ValueError, "Unable to parse file attachment" + result = dispositionre.search(attachinfo["Content-Disposition"]) + filename = result.group(2) + filename = re.sub(".*/","", filename) + filename = re.sub(".*\\\\","", filename) + attachinfo["filename"]=filename + result = re.search("""(\S+);.*""", attachinfo["Content-Type"]) + if result is not None: + attachinfo["Content-Type"] = result.group(1) + if attachinfo.has_key("Content-Transfer-Encoding"): + if attachinfo["Content-Transfer-Encoding"] == "base64": + attachinfo["data"] = base64.decodestring(body) + else: + attachinfo["data"]=body + + return (attachinfo["filename"], attachinfo["data"], + self.get_attacher(self.fields)) + + def find_gnatsweb_attachments(self): + fields = self.fields + attachments = re.split(attachment_delimiter, fields["Unformatted"]) + fields["Unformatted"] = attachments.pop(0) + for attachment in attachments: + result = self.decode_gnatsweb_attachment (attachment) + if result != -1: + self.attachments.append(result) + self.has_unformatted_attach = 1 + def parse_pr(lines): + #fields = {"envelope":[]} + fields = {"envelope":array.array("c")} + hdrmulti = "envelope" + for line in lines: + line = line.rstrip('\n') + line += '\n' + result = gnatfieldre.search(line) + if result is None: + if hdrmulti != "": + if fields.has_key(hdrmulti): + #fields[hdrmulti].append(line) + fields[hdrmulti].fromstring(line) + else: + #fields[hdrmulti] = [line] + fields[hdrmulti] = array.array("c", line) + continue + hdr, arg = result.groups() + ghdr = "*not valid*" + result = fieldnamere.search(hdr) + if result != None: + ghdr = result.groups()[0] + if ghdr in fieldnames: + if multilinefields.has_key(ghdr): + hdrmulti = ghdr + #fields[ghdr] = [""] + fields[ghdr] = array.array("c") + else: + hdrmulti = "" + #fields[ghdr] = [arg] + fields[ghdr] = array.array("c", arg) + elif hdrmulti != "": + #fields[hdrmulti].append(line) + fields[hdrmulti].fromstring(line) + if hdrmulti == "envelope" and \ + (hdr == "Reply-To" or hdr == "From" \ + or hdr == "X-GNATS-Notify"): + arg = fix_email_addrs(arg) + #fields[hdr] = [arg] + fields[hdr] = array.array("c", arg) + if fields.has_key("Reply-To") and len(fields["Reply-To"]) > 0: + fields["Reply-To"] = fields["Reply-To"] + else: + fields["Reply-To"] = fields["From"] + if fields.has_key("From"): + del fields["From"] + if fields.has_key("X-GNATS-Notify") == 0: + fields["X-GNATS-Notify"] = array.array("c") + #fields["X-GNATS-Notify"] = "" + for x in fields.keys(): + fields[x] = fields[x].tostring() + #fields[x] = "".join(fields[x]) + for x in fields.keys(): + if multilinefields.has_key(x): + fields[x] = fields[x].rstrip() + + return fields + parse_pr = staticmethod(parse_pr) +load_index("%s/gnats-adm/index" % gnats_db_dir) +load_categories("%s/gnats-adm/categories" % gnats_db_dir) +load_responsible("%s/gnats-adm/responsible" % gnats_db_dir) +get_userid(gnats_username) +get_userid(unassigned_username) +for x in pr_list: + print "Processing %s..." % x + a = GNATSbug ("%s/%s" % (gnats_db_dir, x)) + b = Bugzillabug(a) +write_non_bug_tables() +outfile.close() diff --git a/contrib/gnatsparse/magic.py b/contrib/gnatsparse/magic.py new file mode 100755 index 000000000..049a7e19b --- /dev/null +++ b/contrib/gnatsparse/magic.py @@ -0,0 +1,712 @@ +# Found on a russian zope mailing list, and modified to fix bugs in parsing +# the magic file and string making +# -- Daniel Berlin <dberlin@dberlin.org> +import sys, struct, time, re, exceptions, pprint, stat, os, pwd, grp + +_mew = 0 + +# _magic='/tmp/magic' +# _magic='/usr/share/magic.mime' +_magic='/usr/share/magic.mime' +mime = 1 + +_ldate_adjust = lambda x: time.mktime( time.gmtime(x) ) + +BUFFER_SIZE = 1024 * 128 # 128K should be enough... + +class MagicError(exceptions.Exception): pass + +def _handle(fmt='@x',adj=None): return fmt, struct.calcsize(fmt), adj + +KnownTypes = { + # 'byte':_handle('@b'), + 'byte':_handle('@B'), + 'ubyte':_handle('@B'), + + 'string':('s',0,None), + 'pstring':_handle('p'), + +# 'short':_handle('@h'), +# 'beshort':_handle('>h'), +# 'leshort':_handle('<h'), + 'short':_handle('@H'), + 'beshort':_handle('>H'), + 'leshort':_handle('<H'), + 'ushort':_handle('@H'), + 'ubeshort':_handle('>H'), + 'uleshort':_handle('<H'), + + 'long':_handle('@l'), + 'belong':_handle('>l'), + 'lelong':_handle('<l'), + 'ulong':_handle('@L'), + 'ubelong':_handle('>L'), + 'ulelong':_handle('<L'), + + 'date':_handle('=l'), + 'bedate':_handle('>l'), + 'ledate':_handle('<l'), + 'ldate':_handle('=l',_ldate_adjust), + 'beldate':_handle('>l',_ldate_adjust), + 'leldate':_handle('<l',_ldate_adjust), +} + +_mew_cnt = 0 +def mew(x): + global _mew_cnt + if _mew : + if x=='.' : + _mew_cnt += 1 + if _mew_cnt % 64 == 0 : sys.stderr.write( '\n' ) + sys.stderr.write( '.' ) + else: + sys.stderr.write( '\b'+x ) + +def has_format(s): + n = 0 + l = None + for c in s : + if c == '%' : + if l == '%' : n -= 1 + else : n += 1 + l = c + return n + +def read_asciiz(file,size=None,pos=None): + s = [] + if pos : + mew('s') + file.seek( pos, 0 ) + mew('z') + if size is not None : + s = [file.read( size ).split('\0')[0]] + else: + while 1 : + c = file.read(1) + if (not c) or (ord(c)==0) or (c=='\n') : break + s.append (c) + mew('Z') + return ''.join(s) + +def a2i(v,base=0): + if v[-1:] in 'lL' : v = v[:-1] + return int( v, base ) + +_cmap = { + '\\' : '\\', + '0' : '\0', +} +for c in range(ord('a'),ord('z')+1) : + try : e = eval('"\\%c"' % chr(c)) + except ValueError : pass + else : _cmap[chr(c)] = e +else: + del c + del e + +def make_string(s): + return eval( '"'+s.replace('"','\\"')+'"') + +class MagicTestError(MagicError): pass + +class MagicTest: + def __init__(self,offset,mtype,test,message,line=None,level=None): + self.line, self.level = line, level + self.mtype = mtype + self.mtest = test + self.subtests = [] + self.mask = None + self.smod = None + self.nmod = None + self.offset, self.type, self.test, self.message = \ + offset,mtype,test,message + if self.mtype == 'true' : return # XXX hack to enable level skips + if test[-1:]=='\\' and test[-2:]!='\\\\' : + self.test += 'n' # looks like someone wanted EOL to match? + if mtype[:6]=='string' : + if '/' in mtype : # for strings + self.type, self.smod = \ + mtype[:mtype.find('/')], mtype[mtype.find('/')+1:] + else: + for nm in '&+-' : + if nm in mtype : # for integer-based + self.nmod, self.type, self.mask = ( + nm, + mtype[:mtype.find(nm)], + # convert mask to int, autodetect base + int( mtype[mtype.find(nm)+1:], 0 ) + ) + break + self.struct, self.size, self.cast = KnownTypes[ self.type ] + def __str__(self): + return '%s %s %s %s' % ( + self.offset, self.mtype, self.mtest, self.message + ) + def __repr__(self): + return 'MagicTest(%s,%s,%s,%s,line=%s,level=%s,subtests=\n%s%s)' % ( + `self.offset`, `self.mtype`, `self.mtest`, `self.message`, + `self.line`, `self.level`, + '\t'*self.level, pprint.pformat(self.subtests) + ) + def run(self,file): + result = '' + do_close = 0 + try: + if type(file) == type('x') : + file = open( file, 'r', BUFFER_SIZE ) + do_close = 1 +# else: +# saved_pos = file.tell() + if self.mtype != 'true' : + data = self.read(file) + last = file.tell() + else: + data = last = None + if self.check( data ) : + result = self.message+' ' + if has_format( result ) : result %= data + for test in self.subtests : + m = test.run(file) + if m is not None : result += m + return make_string( result ) + finally: + if do_close : + file.close() +# else: +# file.seek( saved_pos, 0 ) + def get_mod_and_value(self): + if self.type[-6:] == 'string' : + # "something like\tthis\n" + if self.test[0] in '=<>' : + mod, value = self.test[0], make_string( self.test[1:] ) + else: + mod, value = '=', make_string( self.test ) + else: + if self.test[0] in '=<>&^' : + mod, value = self.test[0], a2i(self.test[1:]) + elif self.test[0] == 'x': + mod = self.test[0] + value = 0 + else: + mod, value = '=', a2i(self.test) + return mod, value + def read(self,file): + mew( 's' ) + file.seek( self.offset(file), 0 ) # SEEK_SET + mew( 'r' ) + try: + data = rdata = None + # XXX self.size might be 0 here... + if self.size == 0 : + # this is an ASCIIZ string... + size = None + if self.test != '>\\0' : # magic's hack for string read... + value = self.get_mod_and_value()[1] + size = (value=='\0') and None or len(value) + rdata = data = read_asciiz( file, size=size ) + else: + rdata = file.read( self.size ) + if not rdata or (len(rdata)!=self.size) : return None + data = struct.unpack( self.struct, rdata )[0] # XXX hack?? + except: + print >>sys.stderr, self + print >>sys.stderr, '@%s struct=%s size=%d rdata=%s' % ( + self.offset, `self.struct`, self.size,`rdata`) + raise + mew( 'R' ) + if self.cast : data = self.cast( data ) + if self.mask : + try: + if self.nmod == '&' : data &= self.mask + elif self.nmod == '+' : data += self.mask + elif self.nmod == '-' : data -= self.mask + else: raise MagicTestError(self.nmod) + except: + print >>sys.stderr,'data=%s nmod=%s mask=%s' % ( + `data`, `self.nmod`, `self.mask` + ) + raise + return data + def check(self,data): + mew('.') + if self.mtype == 'true' : + return '' # not None ! + mod, value = self.get_mod_and_value() + if self.type[-6:] == 'string' : + # "something like\tthis\n" + if self.smod : + xdata = data + if 'b' in self.smod : # all blanks are optional + xdata = ''.join( data.split() ) + value = ''.join( value.split() ) + if 'c' in self.smod : # all blanks are optional + xdata = xdata.upper() + value = value.upper() + # if 'B' in self.smod : # compact blanks + ### XXX sorry, i don't understand this :-( + # data = ' '.join( data.split() ) + # if ' ' not in data : return None + else: + xdata = data + try: + if mod == '=' : result = data == value + elif mod == '<' : result = data < value + elif mod == '>' : result = data > value + elif mod == '&' : result = data & value + elif mod == '^' : result = (data & (~value)) == 0 + elif mod == 'x' : result = 1 + else : raise MagicTestError(self.test) + if result : + zdata, zval = `data`, `value` + if self.mtype[-6:]!='string' : + try: zdata, zval = hex(data), hex(value) + except: zdata, zval = `data`, `value` + if 0 : print >>sys.stderr, '%s @%s %s:%s %s %s => %s (%s)' % ( + '>'*self.level, self.offset, + zdata, self.mtype, `mod`, zval, `result`, + self.message + ) + return result + except: + print >>sys.stderr,'mtype=%s data=%s mod=%s value=%s' % ( + `self.mtype`, `data`, `mod`, `value` + ) + raise + def add(self,mt): + if not isinstance(mt,MagicTest) : + raise MagicTestError((mt,'incorrect subtest type %s'%(type(mt),))) + if mt.level == self.level+1 : + self.subtests.append( mt ) + elif self.subtests : + self.subtests[-1].add( mt ) + elif mt.level > self.level+1 : + # it's possible to get level 3 just after level 1 !!! :-( + level = self.level + 1 + while level < mt.level : + xmt = MagicTest(None,'true','x','',line=self.line,level=level) + self.add( xmt ) + level += 1 + else: + self.add( mt ) # retry... + else: + raise MagicTestError((mt,'incorrect subtest level %s'%(`mt.level`,))) + def last_test(self): + return self.subtests[-1] +#end class MagicTest + +class OffsetError(MagicError): pass + +class Offset: + pos_format = {'b':'<B','B':'>B','s':'<H','S':'>H','l':'<I','L':'>I',} + pattern0 = re.compile(r''' # mere offset + ^ + &? # possible ampersand + ( 0 # just zero + | [1-9]{1,1}[0-9]* # decimal + | 0[0-7]+ # octal + | 0x[0-9a-f]+ # hex + ) + $ + ''', re.X|re.I + ) + pattern1 = re.compile(r''' # indirect offset + ^\( + (?P<base>&?0 # just zero + |&?[1-9]{1,1}[0-9]* # decimal + |&?0[0-7]* # octal + |&?0x[0-9A-F]+ # hex + ) + (?P<type> + \. # this dot might be alone + [BSL]? # one of this chars in either case + )? + (?P<sign> + [-+]{0,1} + )? + (?P<off>0 # just zero + |[1-9]{1,1}[0-9]* # decimal + |0[0-7]* # octal + |0x[0-9a-f]+ # hex + )? + \)$''', re.X|re.I + ) + def __init__(self,s): + self.source = s + self.value = None + self.relative = 0 + self.base = self.type = self.sign = self.offs = None + m = Offset.pattern0.match( s ) + if m : # just a number + if s[0] == '&' : + self.relative, self.value = 1, int( s[1:], 0 ) + else: + self.value = int( s, 0 ) + return + m = Offset.pattern1.match( s ) + if m : # real indirect offset + try: + self.base = m.group('base') + if self.base[0] == '&' : + self.relative, self.base = 1, int( self.base[1:], 0 ) + else: + self.base = int( self.base, 0 ) + if m.group('type') : self.type = m.group('type')[1:] + self.sign = m.group('sign') + if m.group('off') : self.offs = int( m.group('off'), 0 ) + if self.sign == '-' : self.offs = 0 - self.offs + except: + print >>sys.stderr, '$$', m.groupdict() + raise + return + raise OffsetError(`s`) + def __call__(self,file=None): + if self.value is not None : return self.value + pos = file.tell() + try: + if not self.relative : file.seek( self.offset, 0 ) + frmt = Offset.pos_format.get( self.type, 'I' ) + size = struct.calcsize( frmt ) + data = struct.unpack( frmt, file.read( size ) ) + if self.offs : data += self.offs + return data + finally: + file.seek( pos, 0 ) + def __str__(self): return self.source + def __repr__(self): return 'Offset(%s)' % `self.source` +#end class Offset + +class MagicFileError(MagicError): pass + +class MagicFile: + def __init__(self,filename=_magic): + self.file = None + self.tests = [] + self.total_tests = 0 + self.load( filename ) + self.ack_tests = None + self.nak_tests = None + def __del__(self): + self.close() + def load(self,filename=None): + self.open( filename ) + self.parse() + self.close() + def open(self,filename=None): + self.close() + if filename is not None : + self.filename = filename + self.file = open( self.filename, 'r', BUFFER_SIZE ) + def close(self): + if self.file : + self.file.close() + self.file = None + def parse(self): + line_no = 0 + for line in self.file.xreadlines() : + line_no += 1 + if not line or line[0]=='#' : continue + line = line.lstrip().rstrip('\r\n') + if not line or line[0]=='#' : continue + try: + x = self.parse_line( line ) + if x is None : + print >>sys.stderr, '#[%04d]#'%line_no, line + continue + except: + print >>sys.stderr, '###[%04d]###'%line_no, line + raise + self.total_tests += 1 + level, offset, mtype, test, message = x + new_test = MagicTest(offset,mtype,test,message, + line=line_no,level=level) + try: + if level == 0 : + self.tests.append( new_test ) + else: + self.tests[-1].add( new_test ) + except: + if 1 : + print >>sys.stderr, 'total tests=%s' % ( + `self.total_tests`, + ) + print >>sys.stderr, 'level=%s' % ( + `level`, + ) + print >>sys.stderr, 'tests=%s' % ( + pprint.pformat(self.tests), + ) + raise + else: + while self.tests[-1].level > 0 : + self.tests.pop() + def parse_line(self,line): + # print >>sys.stderr, 'line=[%s]' % line + if (not line) or line[0]=='#' : return None + level = 0 + offset = mtype = test = message = '' + mask = None + # get optional level (count leading '>') + while line and line[0]=='>' : + line, level = line[1:], level+1 + # get offset + while line and not line[0].isspace() : + offset, line = offset+line[0], line[1:] + try: + offset = Offset(offset) + except: + print >>sys.stderr, 'line=[%s]' % line + raise + # skip spaces + line = line.lstrip() + # get type + c = None + while line : + last_c, c, line = c, line[0], line[1:] + if last_c!='\\' and c.isspace() : + break # unescaped space - end of field + else: + mtype += c + if last_c == '\\' : + c = None # don't fuck my brain with sequential backslashes + # skip spaces + line = line.lstrip() + # get test + c = None + while line : + last_c, c, line = c, line[0], line[1:] + if last_c!='\\' and c.isspace() : + break # unescaped space - end of field + else: + test += c + if last_c == '\\' : + c = None # don't fuck my brain with sequential backslashes + # skip spaces + line = line.lstrip() + # get message + message = line + if mime and line.find("\t") != -1: + message=line[0:line.find("\t")] + # + # print '>>', level, offset, mtype, test, message + return level, offset, mtype, test, message + def detect(self,file): + self.ack_tests = 0 + self.nak_tests = 0 + answers = [] + for test in self.tests : + message = test.run( file ) + if message : + self.ack_tests += 1 + answers.append( message ) + else: + self.nak_tests += 1 + if answers : + return '; '.join( answers ) +#end class MagicFile + +def username(uid): + try: + return pwd.getpwuid( uid )[0] + except: + return '#%s'%uid + +def groupname(gid): + try: + return grp.getgrgid( gid )[0] + except: + return '#%s'%gid + +def get_file_type(fname,follow): + t = None + if not follow : + try: + st = os.lstat( fname ) # stat that entry, don't follow links! + except os.error, why : + pass + else: + if stat.S_ISLNK(st[stat.ST_MODE]) : + t = 'symbolic link' + try: + lnk = os.readlink( fname ) + except: + t += ' (unreadable)' + else: + t += ' to '+lnk + if t is None : + try: + st = os.stat( fname ) + except os.error, why : + return "can't stat `%s' (%s)." % (why.filename,why.strerror) + + dmaj, dmin = (st.st_rdev>>8)&0x0FF, st.st_rdev&0x0FF + + if 0 : pass + elif stat.S_ISSOCK(st.st_mode) : t = 'socket' + elif stat.S_ISLNK (st.st_mode) : t = follow and 'symbolic link' or t + elif stat.S_ISREG (st.st_mode) : t = 'file' + elif stat.S_ISBLK (st.st_mode) : t = 'block special (%d/%d)'%(dmaj,dmin) + elif stat.S_ISDIR (st.st_mode) : t = 'directory' + elif stat.S_ISCHR (st.st_mode) : t = 'character special (%d/%d)'%(dmaj,dmin) + elif stat.S_ISFIFO(st.st_mode) : t = 'pipe' + else: t = '<unknown>' + + if st.st_mode & stat.S_ISUID : + t = 'setuid(%d=%s) %s'%(st.st_uid,username(st.st_uid),t) + if st.st_mode & stat.S_ISGID : + t = 'setgid(%d=%s) %s'%(st.st_gid,groupname(st.st_gid),t) + if st.st_mode & stat.S_ISVTX : + t = 'sticky '+t + + return t + +HELP = '''%s [options] [files...] + +Options: + + -?, --help -- this help + -m, --magic=<file> -- use this magic <file> instead of %s + -f, --files=<namefile> -- read filenames for <namefile> +* -C, --compile -- write "compiled" magic file + -b, --brief -- don't prepend filenames to output lines ++ -c, --check -- check the magic file + -i, --mime -- output MIME types +* -k, --keep-going -- don't stop st the first match + -n, --flush -- flush stdout after each line + -v, --verson -- print version and exit +* -z, --compressed -- try to look inside compressed files + -L, --follow -- follow symlinks + -s, --special -- don't skip special files + +* -- not implemented so far ;-) ++ -- implemented, but in another way... +''' + +def main(): + import getopt + global _magic + try: + brief = 0 + flush = 0 + follow= 0 + mime = 0 + check = 0 + special=0 + try: + opts, args = getopt.getopt( + sys.argv[1:], + '?m:f:CbciknvzLs', + ( 'help', + 'magic=', + 'names=', + 'compile', + 'brief', + 'check', + 'mime', + 'keep-going', + 'flush', + 'version', + 'compressed', + 'follow', + 'special', + ) + ) + except getopt.error, why: + print >>sys.stderr, sys.argv[0], why + return 1 + else: + files = None + for o,v in opts : + if o in ('-?','--help'): + print HELP % ( + sys.argv[0], + _magic, + ) + return 0 + elif o in ('-f','--files='): + files = v + elif o in ('-m','--magic='): + _magic = v[:] + elif o in ('-C','--compile'): + pass + elif o in ('-b','--brief'): + brief = 1 + elif o in ('-c','--check'): + check = 1 + elif o in ('-i','--mime'): + mime = 1 + if os.path.exists( _magic+'.mime' ) : + _magic += '.mime' + print >>sys.stderr,sys.argv[0]+':',\ + "Using regular magic file `%s'" % _magic + elif o in ('-k','--keep-going'): + pass + elif o in ('-n','--flush'): + flush = 1 + elif o in ('-v','--version'): + print 'VERSION' + return 0 + elif o in ('-z','--compressed'): + pass + elif o in ('-L','--follow'): + follow = 1 + elif o in ('-s','--special'): + special = 1 + else: + if files : + files = map(lambda x: x.strip(), v.split(',')) + if '-' in files and '-' in args : + error( 1, 'cannot use STDIN simultaneously for file list and data' ) + for file in files : + for name in ( + (file=='-') + and sys.stdin + or open(file,'r',BUFFER_SIZE) + ).xreadlines(): + name = name.strip() + if name not in args : + args.append( name ) + try: + if check : print >>sys.stderr, 'Loading magic database...' + t0 = time.time() + m = MagicFile(_magic) + t1 = time.time() + if check : + print >>sys.stderr, \ + m.total_tests, 'tests loaded', \ + 'for', '%.2f' % (t1-t0), 'seconds' + print >>sys.stderr, len(m.tests), 'tests at top level' + return 0 # XXX "shortened" form ;-) + + mlen = max( map(len, args) )+1 + for arg in args : + if not brief : print (arg + ':').ljust(mlen), + ftype = get_file_type( arg, follow ) + if (special and ftype.find('special')>=0) \ + or ftype[-4:] == 'file' : + t0 = time.time() + try: + t = m.detect( arg ) + except (IOError,os.error), why: + t = "can't read `%s' (%s)" % (why.filename,why.strerror) + if ftype[-4:] == 'file' : t = ftype[:-4] + t + t1 = time.time() + print t and t or 'data' + if 0 : print \ + '#\t%d tests ok, %d tests failed for %.2f seconds'%\ + (m.ack_tests, m.nak_tests, t1-t0) + else: + print mime and 'application/x-not-regular-file' or ftype + if flush : sys.stdout.flush() + # print >>sys.stderr, 'DONE' + except: + if check : return 1 + raise + else: + return 0 + finally: + pass + +if __name__ == '__main__' : + sys.exit( main() ) +# vim:ai +# EOF # diff --git a/contrib/gnatsparse/specialuu.py b/contrib/gnatsparse/specialuu.py new file mode 100755 index 000000000..b729d9c59 --- /dev/null +++ b/contrib/gnatsparse/specialuu.py @@ -0,0 +1,104 @@ +#! /usr/bin/env python2.2 + +# Copyright 1994 by Lance Ellinghouse +# Cathedral City, California Republic, United States of America. +# All Rights Reserved +# Permission to use, copy, modify, and distribute this software and its +# documentation for any purpose and without fee is hereby granted, +# provided that the above copyright notice appear in all copies and that +# both that copyright notice and this permission notice appear in +# supporting documentation, and that the name of Lance Ellinghouse +# not be used in advertising or publicity pertaining to distribution +# of the software without specific, written prior permission. +# LANCE ELLINGHOUSE DISCLAIMS ALL WARRANTIES WITH REGARD TO +# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND +# FITNESS, IN NO EVENT SHALL LANCE ELLINGHOUSE CENTRUM BE LIABLE +# FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT +# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +# +# Modified by Jack Jansen, CWI, July 1995: +# - Use binascii module to do the actual line-by-line conversion +# between ascii and binary. This results in a 1000-fold speedup. The C +# version is still 5 times faster, though. +# - Arguments more compliant with python standard + +"""Implementation of the UUencode and UUdecode functions. + +encode(in_file, out_file [,name, mode]) +decode(in_file [, out_file, mode]) +""" + +import binascii +import os +import sys +from types import StringType + +__all__ = ["Error", "decode"] + +class Error(Exception): + pass + +def decode(in_file, out_file=None, mode=None, quiet=0): + """Decode uuencoded file""" + # + # Open the input file, if needed. + # + if in_file == '-': + in_file = sys.stdin + elif isinstance(in_file, StringType): + in_file = open(in_file) + # + # Read until a begin is encountered or we've exhausted the file + # + while 1: + hdr = in_file.readline() + if not hdr: + raise Error, 'No valid begin line found in input file' + if hdr[:5] != 'begin': + continue + hdrfields = hdr.split(" ", 2) + if len(hdrfields) == 3 and hdrfields[0] == 'begin': + try: + int(hdrfields[1], 8) + start_pos = in_file.tell() - len (hdr) + break + except ValueError: + pass + if out_file is None: + out_file = hdrfields[2].rstrip() + if os.path.exists(out_file): + raise Error, 'Cannot overwrite existing file: %s' % out_file + if mode is None: + mode = int(hdrfields[1], 8) + # + # Open the output file + # + if out_file == '-': + out_file = sys.stdout + elif isinstance(out_file, StringType): + fp = open(out_file, 'wb') + try: + os.path.chmod(out_file, mode) + except AttributeError: + pass + out_file = fp + # + # Main decoding loop + # + s = in_file.readline() + while s and s.strip() != 'end': + try: + data = binascii.a2b_uu(s) + except binascii.Error, v: + # Workaround for broken uuencoders by /Fredrik Lundh + nbytes = (((ord(s[0])-32) & 63) * 4 + 5) / 3 + data = binascii.a2b_uu(s[:nbytes]) + if not quiet: + sys.stderr.write("Warning: %s\n" % str(v)) + out_file.write(data) + s = in_file.readline() +# if not s: + # raise Error, 'Truncated input file' + return (hdrfields[2].rstrip(), start_pos, in_file.tell()) |