diff options
Diffstat (limited to 'contrib/gnatsparse')
-rwxr-xr-x | contrib/gnatsparse/README | 62 | ||||
-rwxr-xr-x | contrib/gnatsparse/gnatsparse.py | 807 | ||||
-rwxr-xr-x | contrib/gnatsparse/magic.py | 712 | ||||
-rwxr-xr-x | contrib/gnatsparse/specialuu.py | 104 |
4 files changed, 0 insertions, 1685 deletions
diff --git a/contrib/gnatsparse/README b/contrib/gnatsparse/README deleted file mode 100755 index f7cf01c68..000000000 --- a/contrib/gnatsparse/README +++ /dev/null @@ -1,62 +0,0 @@ -gnatsparse -========== - -Author: Daniel Berlin <dan@dberlin.org> - -gnatsparse is a simple Python program that imports a GNATS database -into a Bugzilla system. It is based on the gnats2bz.pl Perl script -but it's a rewrite at the same time. Its parser is based on gnatsweb, -which gives a 10 times speed improvement compared to the previous code. - -Features --------- - -* Chunks audit trail into separate comments, with the right From's, times, etc. - -* Handles followup emails that are in the report, with the right From's, times, -etc. - -* Properly handles duplicates, adding the standard bugzilla duplicate message. - -* Extracts and handles gnatsweb attachments, as well as uuencoded attachments -appearing in either followup emails, the how-to-repeat field, etc. Replaces -them with a message to look at the attachments list, and adds the standard -"Created an attachment" message that bugzilla uses. Handling them includes -giving them the right name and mime-type. "attachments" means multiple -uuencoded things/gnatsweb attachments are handled properly. - -* Handles reopened bug reports. - -* Builds the cc list from the people who have commented on the report, -and the reporter. - -Requirements ------------- - -It requires python 2.2+, it won't work with 1.5.2 (Linux distributions -ship with 2.2+ these days, so that shouldn't be an issue). - -Documentation -------------- - -Documentation can be found inside the scripts. The source code is self -documenting. - -Issues for someone trying to use it to convert a gnats install ------------------------------------ - -1. We have three custom fields bugzilla doesn't ship with, -gcchost, gcctarget, and gccbuild. -We removed two bugzilla fields, rep_platform and op_sys. -If you use the latter instead of the former, you'll need to -update the script to account for this. -2. Because gcc attachments consist of preprocessed source, all attachments -inserted into the attachment database are compressed with zlib.compress. -This requires associated bugzilla changes to decompress before sending to -the browser. -Unless you want to make those changes (it's roughly 3 lines), you'll -need to remove the zlib.compress call. -3. You will need to come up with your own release to version mapping and -install it. -4. Obviously, any extra gnats fields you have added will have to -be handled in some manner. diff --git a/contrib/gnatsparse/gnatsparse.py b/contrib/gnatsparse/gnatsparse.py deleted file mode 100755 index 9eef18b89..000000000 --- a/contrib/gnatsparse/gnatsparse.py +++ /dev/null @@ -1,807 +0,0 @@ -try: -# Using Psyco makes it about 25% faster, but there's a bug in psyco in -# handling of eval causing it to use unlimited memory with the magic -# file enabled. -# import psyco -# psyco.full() -# from psyco.classes import * - pass -except: - pass -import re -import base64 -import cStringIO -import specialuu -import array -import email.Utils -import zlib -import magic - -# Comment out if you don't want magic detection -magicf = magic.MagicFile() - -# Open our output file -outfile = open("gnats2bz_data.sql", "w") - -# List of GNATS fields -fieldnames = ("Number", "Category", "Synopsis", "Confidential", "Severity", - "Priority", "Responsible", "State", "Quarter", "Keywords", - "Date-Required", "Class", "Submitter-Id", "Arrival-Date", - "Closed-Date", "Last-Modified", "Originator", "Release", - "Organization", "Environment", "Description", "How-To-Repeat", - "Fix", "Release-Note", "Audit-Trail", "Unformatted") - -# Dictionary telling us which GNATS fields are multiline -multilinefields = {"Organization":1, "Environment":1, "Description":1, - "How-To-Repeat":1, "Fix":1, "Release-Note":1, - "Audit-Trail":1, "Unformatted":1} - -# Mapping of GCC release to version. Our version string is updated every -# so we need to funnel all release's with 3.4 in the string to be version -# 3.4 for bug tracking purposes -# The key is a regex to match, the value is the version it corresponds -# with -releasetovermap = {r"3\.4":"3.4", r"3\.3":"3.3", r"3\.2\.2":"3.2.2", - r"3\.2\.1":"3.2.1", r"3\.2":"3.2", r"3\.1\.2":"3.1.2", - r"3\.1\.1":"3.1.1", r"3\.1":"3.1", r"3\.0\.4":"3.0.4", - r"3\.0\.3":"3.0.3", r"3\.0\.2":"3.0.2", r"3\.0\.1":"3.0.1", - r"3\.0":"3.0", r"2\.95\.4":"2.95.4", r"2\.95\.3":"2.95.3", - r"2\.95\.2":"2.95.2", r"2\.95\.1":"2.95.1", - r"2\.95":"2.95", r"2\.97":"2.97", - r"2\.96.*[rR][eE][dD].*[hH][aA][tT]":"2.96 (redhat)", - r"2\.96":"2.96"} - -# These map the field name to the field id bugzilla assigns. We need -# the id when doing bug activity. -fieldids = {"State":8, "Responsible":15} - -# These are the keywords we use in gcc bug tracking. They are transformed -# into bugzilla keywords. The format here is <keyword>-><bugzilla keyword id> -keywordids = {"wrong-code":1, "ice-on-legal-code":2, "ice-on-illegal-code":3, - "rejects-legal":4, "accepts-illegal":5, "pessimizes-code":6} - -# Map from GNATS states to Bugzilla states. Duplicates and reopened bugs -# are handled when parsing the audit trail, so no need for them here. -state_lookup = {"":"NEW", "open":"ASSIGNED", "analyzed":"ASSIGNED", - "feedback":"WAITING", "closed":"CLOSED", - "suspended":"SUSPENDED"} - -# Table of versions that exist in the bugs, built up as we go along -versions_table = {} - -# Delimiter gnatsweb uses for attachments -attachment_delimiter = "----gnatsweb-attachment----\n" - -# Here starts the various regular expressions we use -# Matches an entire GNATS single line field -gnatfieldre = re.compile(r"""^([>\w\-]+)\s*:\s*(.*)\s*$""") - -# Matches the name of a GNATS field -fieldnamere = re.compile(r"""^>(.*)$""") - -# Matches the useless part of an envelope -uselessre = re.compile(r"""^(\S*?):\s*""", re.MULTILINE) - -# Matches the filename in a content disposition -dispositionre = re.compile("(\\S+);\\s*filename=\"([^\"]+)\"") - -# Matches the last changed date in the entire text of a bug -# If you have other editable fields that get audit trail entries, modify this -# The field names are explicitly listed in order to speed up matching -lastdatere = re.compile(r"""^(?:(?:State|Responsible|Priority|Severity)-Changed-When: )(.+?)$""", re.MULTILINE) - -# Matches the From line of an email or the first line of an audit trail entry -# We use this re to find the begin lines of all the audit trail entries -# The field names are explicitly listed in order to speed up matching -fromtore=re.compile(r"""^(?:(?:State|Responsible|Priority|Severity)-Changed-From-To: |From: )""", re.MULTILINE) - -# These re's match the various parts of an audit trail entry -changedfromtore=re.compile(r"""^(\w+?)-Changed-From-To: (.+?)$""", re.MULTILINE) -changedbyre=re.compile(r"""^\w+?-Changed-By: (.+?)$""", re.MULTILINE) -changedwhenre=re.compile(r"""^\w+?-Changed-When: (.+?)$""", re.MULTILINE) -changedwhyre=re.compile(r"""^\w+?-Changed-Why:\s*(.*?)$""", re.MULTILINE) - -# This re matches audit trail text saying that the current bug is a duplicate of another -duplicatere=re.compile(r"""(?:")?Dup(?:licate)?(?:d)?(?:")? of .*?(\d+)""", re.IGNORECASE | re.MULTILINE) - -# Get the text of a From: line -fromre=re.compile(r"""^From: (.*?)$""", re.MULTILINE) - -# Get the text of a Date: Line -datere=re.compile(r"""^Date: (.*?)$""", re.MULTILINE) - -# Map of the responsible file to email addresses -responsible_map = {} -# List of records in the responsible file -responsible_list = [] -# List of records in the categories file -categories_list = [] -# List of pr's in the index -pr_list = [] -# Map usernames to user ids -usermapping = {} -# Start with this user id -userid_base = 2 - -# Name of gnats user -gnats_username = "gnats@gcc.gnu.org" -# Name of unassigned user -unassigned_username = "unassigned@gcc.gnu.org" - -gnats_db_dir = "." -product = "gcc" -productdesc = "GNU Compiler Connection" -milestoneurl = "http://gcc/gnu.org" -defaultmilestone = "3.4" - -def write_non_bug_tables(): - """ Write out the non-bug related tables, such as products, profiles, etc.""" - # Set all non-unconfirmed bugs's everconfirmed flag - print >>outfile, "update bugs set everconfirmed=1 where bug_status != 'UNCONFIRMED';" - - # Set all bugs assigned to the unassigned user to NEW - print >>outfile, "update bugs set bug_status='NEW',assigned_to='NULL' where bug_status='ASSIGNED' AND assigned_to=3;" - - # Insert the products - print >>outfile, "\ninsert into products (" - print >>outfile, " product, description, milestoneurl, isactive," - print >>outfile, " defaultmilestone, votestoconfirm) values (" - print >>outfile, " '%s', '%s', '%s', 1, '%s', 1);" % (product, - productdesc, - milestoneurl, - defaultmilestone) - - # Insert the components - for category in categories_list: - component = SqlQuote(category[0]) - productstr = SqlQuote(product) - description = SqlQuote(category[1]) - initialowner = SqlQuote("3") - print >>outfile, "\ninsert into components ("; - print >>outfile, " value, program, initialowner, initialqacontact," - print >>outfile, " description) values (" - print >>outfile, " %s, %s, %s, '', %s);" % (component, productstr, - initialowner, description) - - # Insert the versions - for productstr, version_list in versions_table.items(): - productstr = SqlQuote(productstr) - for version in version_list: - version = SqlQuote(version) - print >>outfile, "\ninsert into versions (value, program) " - print >>outfile, " values (%s, %s);" % (version, productstr) - - # Insert the users - for username, userid in usermapping.items(): - realname = map_username_to_realname(username) - username = SqlQuote(username) - realname = SqlQuote(realname) - print >>outfile, "\ninsert into profiles (" - print >>outfile, " userid, login_name, password, cryptpassword, realname, groupset" - print >>outfile, ") values (" - print >>outfile, "%s,%s,'password',encrypt('password'), %s, 0);" % (userid, username, realname) - print >>outfile, "update profiles set groupset=1 << 32 where login_name like '%\@gcc.gnu.org';" - -def unixdate2datetime(unixdate): - """ Convert a unix date to a datetime value """ - year, month, day, hour, min, sec, x, x, x, x = email.Utils.parsedate_tz(unixdate) - return "%d-%02d-%02d %02d:%02d:%02d" % (year,month,day,hour,min,sec) - -def unixdate2timestamp(unixdate): - """ Convert a unix date to a timestamp value """ - year, month, day, hour, min, sec, x, x, x, x = email.Utils.parsedate_tz(unixdate) - return "%d%02d%02d%02d%02d%02d" % (year,month,day,hour,min,sec) - -def SqlQuote(str): - """ Perform SQL quoting on a string """ - return "'%s'" % str.replace("'", """''""").replace("\\", "\\\\").replace("\0","\\0") - -def convert_gccver_to_ver(gccver): - """ Given a gcc version, convert it to a Bugzilla version. """ - for k in releasetovermap.keys(): - if re.search(".*%s.*" % k, gccver) is not None: - return releasetovermap[k] - result = re.search(r""".*(\d\.\d) \d+ \(experimental\).*""", gccver) - if result is not None: - return result.group(1) - return "unknown" - -def load_index(fname): - """ Load in the GNATS index file """ - global pr_list - ifp = open(fname) - for record in ifp.xreadlines(): - fields = record.split("|") - pr_list.append(fields[0]) - ifp.close() - -def load_categories(fname): - """ Load in the GNATS categories file """ - global categories_list - cfp = open(fname) - for record in cfp.xreadlines(): - if re.search("^#", record) is not None: - continue - categories_list.append(record.split(":")) - cfp.close() - -def map_username_to_realname(username): - """ Given a username, find the real name """ - name = username - name = re.sub("@.*", "", name) - for responsible_record in responsible_list: - if responsible_record[0] == name: - return responsible_record[1] - if len(responsible_record) > 2: - if responsible_record[2] == username: - return responsible_record[1] - return "" - - -def get_userid(responsible): - """ Given an email address, get the user id """ - global responsible_map - global usermapping - global userid_base - if responsible is None: - return -1 - responsible = responsible.lower() - responsible = re.sub("sources.redhat.com", "gcc.gnu.org", responsible) - if responsible_map.has_key(responsible): - responsible = responsible_map[responsible] - if usermapping.has_key(responsible): - return usermapping[responsible] - else: - usermapping[responsible] = userid_base - userid_base += 1 - return usermapping[responsible] - -def load_responsible(fname): - """ Load in the GNATS responsible file """ - global responsible_map - global responsible_list - rfp = open(fname) - for record in rfp.xreadlines(): - if re.search("^#", record) is not None: - continue - split_record = record.split(":") - responsible_map[split_record[0]] = split_record[2].rstrip() - responsible_list.append(record.split(":")) - rfp.close() - -def split_csl(list): - """ Split a comma separated list """ - newlist = re.split(r"""\s*,\s*""", list) - return newlist - -def fix_email_addrs(addrs): - """ Perform various fixups and cleaning on an e-mail address """ - addrs = split_csl(addrs) - trimmed_addrs = [] - for addr in addrs: - addr = re.sub(r"""\(.*\)""","",addr) - addr = re.sub(r""".*<(.*)>.*""","\\1",addr) - addr = addr.rstrip() - addr = addr.lstrip() - trimmed_addrs.append(addr) - addrs = ", ".join(trimmed_addrs) - return addrs - -class Bugzillabug(object): - """ Class representing a bugzilla bug """ - def __init__(self, gbug): - """ Initialize a bugzilla bug from a GNATS bug. """ - self.bug_id = gbug.bug_id - self.long_descs = [] - self.bug_ccs = [get_userid("gcc-bugs@gcc.gnu.org")] - self.bug_activity = [] - self.attachments = gbug.attachments - self.gnatsfields = gbug.fields - self.need_unformatted = gbug.has_unformatted_attach == 0 - self.need_unformatted &= gbug.fields.has_key("Unformatted") - self.translate_pr() - self.update_versions() - if self.fields.has_key("Audit-Trail"): - self.parse_audit_trail() - self.write_bug() - - def parse_fromto(type, string): - """ Parses the from and to parts of a changed-from-to line """ - fromstr = "" - tostr = "" - - # Some slightly messed up changed lines have unassigned-new, - # instead of unassigned->new. So we make the > optional. - result = re.search(r"""(.*)-(?:>?)(.*)""", string) - - # Only know how to handle parsing of State and Responsible - # changed-from-to right now - if type == "State": - fromstr = state_lookup[result.group(1)] - tostr = state_lookup[result.group(2)] - elif type == "Responsible": - if result.group(1) != "": - fromstr = result.group(1) - if result.group(2) != "": - tostr = result.group(2) - if responsible_map.has_key(fromstr): - fromstr = responsible_map[fromstr] - if responsible_map.has_key(tostr): - tostr = responsible_map[tostr] - return (fromstr, tostr) - parse_fromto = staticmethod(parse_fromto) - - def parse_audit_trail(self): - """ Parse a GNATS audit trail """ - trail = self.fields["Audit-Trail"] - # Begin to split the audit trail into pieces - result = fromtore.finditer(trail) - starts = [] - ends = [] - pieces = [] - # Make a list of the pieces - for x in result: - pieces.append (x) - # Find the start and end of each piece - if len(pieces) > 0: - for x in xrange(len(pieces)-1): - starts.append(pieces[x].start()) - ends.append(pieces[x+1].start()) - starts.append(pieces[-1].start()) - ends.append(len(trail)) - pieces = [] - # Now make the list of actual text of the pieces - for x in xrange(len(starts)): - pieces.append(trail[starts[x]:ends[x]]) - # And parse the actual pieces - for piece in pieces: - result = changedfromtore.search(piece) - # See what things we actually have inside this entry, and - # handle them appropriately - if result is not None: - type = result.group(1) - changedfromto = result.group(2) - # If the bug was reopened, mark it as such - if changedfromto.find("closed->analyzed") != -1: - if self.fields["bug_status"] == "'NEW'": - self.fields["bug_status"] = "'REOPENED'" - if type == "State" or type == "Responsible": - oldstate, newstate = self.parse_fromto (type, changedfromto) - result = changedbyre.search(piece) - if result is not None: - changedby = result.group(1) - result = changedwhenre.search(piece) - if result is not None: - changedwhen = result.group(1) - changedwhen = unixdate2datetime(changedwhen) - changedwhen = SqlQuote(changedwhen) - result = changedwhyre.search(piece) - changedwhy = piece[result.start(1):] - #changedwhy = changedwhy.lstrip() - changedwhy = changedwhy.rstrip() - changedby = get_userid(changedby) - # Put us on the cc list if we aren't there already - if changedby != self.fields["userid"] \ - and changedby not in self.bug_ccs: - self.bug_ccs.append(changedby) - # If it's a duplicate, mark it as such - result = duplicatere.search(changedwhy) - if result is not None: - newtext = "*** This bug has been marked as a duplicate of %s ***" % result.group(1) - newtext = SqlQuote(newtext) - self.long_descs.append((self.bug_id, changedby, - changedwhen, newtext)) - self.fields["bug_status"] = "'RESOLVED'" - self.fields["resolution"] = "'DUPLICATE'" - self.fields["userid"] = changedby - else: - newtext = "%s-Changed-From-To: %s\n%s-Changed-Why: %s\n" % (type, changedfromto, type, changedwhy) - newtext = SqlQuote(newtext) - self.long_descs.append((self.bug_id, changedby, - changedwhen, newtext)) - if type == "State" or type == "Responsible": - newstate = SqlQuote("%s" % newstate) - oldstate = SqlQuote("%s" % oldstate) - fieldid = fieldids[type] - self.bug_activity.append((newstate, oldstate, fieldid, changedby, changedwhen)) - - else: - # It's an email - result = fromre.search(piece) - if result is None: - continue - fromstr = result.group(1) - fromstr = fix_email_addrs(fromstr) - fromstr = get_userid(fromstr) - result = datere.search(piece) - if result is None: - continue - datestr = result.group(1) - datestr = SqlQuote(unixdate2timestamp(datestr)) - if fromstr != self.fields["userid"] \ - and fromstr not in self.bug_ccs: - self.bug_ccs.append(fromstr) - self.long_descs.append((self.bug_id, fromstr, datestr, - SqlQuote(piece))) - - - - def write_bug(self): - """ Output a bug to the data file """ - fields = self.fields - print >>outfile, "\ninsert into bugs(" - print >>outfile, " bug_id, assigned_to, bug_severity, priority, bug_status, creation_ts, delta_ts," - print >>outfile, " short_desc," - print >>outfile, " reporter, version," - print >>outfile, " product, component, resolution, target_milestone, qa_contact," - print >>outfile, " gccbuild, gcctarget, gcchost, keywords" - print >>outfile, " ) values (" - print >>outfile, "%s, %s, %s, %s, %s, %s, %s," % (self.bug_id, fields["userid"], fields["bug_severity"], fields["priority"], fields["bug_status"], fields["creation_ts"], fields["delta_ts"]) - print >>outfile, "%s," % (fields["short_desc"]) - print >>outfile, "%s, %s," % (fields["reporter"], fields["version"]) - print >>outfile, "%s, %s, %s, %s, 0," %(fields["product"], fields["component"], fields["resolution"], fields["target_milestone"]) - print >>outfile, "%s, %s, %s, %s" % (fields["gccbuild"], fields["gcctarget"], fields["gcchost"], fields["keywords"]) - print >>outfile, ");" - if self.fields["keywords"] != 0: - print >>outfile, "\ninsert into keywords (bug_id, keywordid) values (" - print >>outfile, " %s, %s);" % (self.bug_id, fields["keywordid"]) - for id, who, when, text in self.long_descs: - print >>outfile, "\ninsert into longdescs (" - print >>outfile, " bug_id, who, bug_when, thetext) values(" - print >>outfile, " %s, %s, %s, %s);" % (id, who, when, text) - for name, data, who in self.attachments: - print >>outfile, "\ninsert into attachments (" - print >>outfile, " bug_id, filename, description, mimetype, ispatch, submitter_id) values (" - ftype = None - # It's *magic*! - if name.endswith(".ii") == 1: - ftype = "text/x-c++" - elif name.endswith(".i") == 1: - ftype = "text/x-c" - else: - ftype = magicf.detect(cStringIO.StringIO(data)) - if ftype is None: - ftype = "application/octet-stream" - - print >>outfile, "%s,%s,%s, %s,0, %s,%s);" %(self.bug_id, SqlQuote(name), SqlQuote(name), SqlQuote (ftype), who) - print >>outfile, "\ninsert into attach_data (" - print >>outfile, "\n(id, thedata) values (last_insert_id()," - print >>outfile, "%s);" % (SqlQuote(zlib.compress(data))) - for newstate, oldstate, fieldid, changedby, changedwhen in self.bug_activity: - print >>outfile, "\ninsert into bugs_activity (" - print >>outfile, " bug_id, who, bug_when, fieldid, added, removed) values (" - print >>outfile, " %s, %s, %s, %s, %s, %s);" % (self.bug_id, - changedby, - changedwhen, - fieldid, - newstate, - oldstate) - for cc in self.bug_ccs: - print >>outfile, "\ninsert into cc(bug_id, who) values (%s, %s);" %(self.bug_id, cc) - def update_versions(self): - """ Update the versions table to account for the version on this bug """ - global versions_table - if self.fields.has_key("Release") == 0 \ - or self.fields.has_key("Category") == 0: - return - curr_product = "gcc" - curr_version = self.fields["Release"] - if curr_version == "": - return - curr_version = convert_gccver_to_ver (curr_version) - if versions_table.has_key(curr_product) == 0: - versions_table[curr_product] = [] - for version in versions_table[curr_product]: - if version == curr_version: - return - versions_table[curr_product].append(curr_version) - def translate_pr(self): - """ Transform a GNATS PR into a Bugzilla bug """ - self.fields = self.gnatsfields - if (self.fields.has_key("Organization") == 0) \ - or self.fields["Organization"].find("GCC"): - self.fields["Originator"] = "" - self.fields["Organization"] = "" - self.fields["Organization"].lstrip() - if (self.fields.has_key("Release") == 0) \ - or self.fields["Release"] == "" \ - or self.fields["Release"].find("unknown-1.0") != -1: - self.fields["Release"]="unknown" - if self.fields.has_key("Responsible"): - result = re.search(r"""\w+""", self.fields["Responsible"]) - self.fields["Responsible"] = "%s%s" % (result.group(0), "@gcc.gnu.org") - self.fields["gcchost"] = "" - self.fields["gcctarget"] = "" - self.fields["gccbuild"] = "" - if self.fields.has_key("Environment"): - result = re.search("^host: (.+?)$", self.fields["Environment"], - re.MULTILINE) - if result is not None: - self.fields["gcchost"] = result.group(1) - result = re.search("^target: (.+?)$", self.fields["Environment"], - re.MULTILINE) - if result is not None: - self.fields["gcctarget"] = result.group(1) - result = re.search("^build: (.+?)$", self.fields["Environment"], - re.MULTILINE) - if result is not None: - self.fields["gccbuild"] = result.group(1) - self.fields["userid"] = get_userid(self.fields["Responsible"]) - self.fields["bug_severity"] = "normal" - if self.fields["Class"] == "change-request": - self.fields["bug_severity"] = "enhancement" - elif self.fields.has_key("Severity"): - if self.fields["Severity"] == "critical": - self.fields["bug_severity"] = "critical" - elif self.fields["Severity"] == "serious": - self.fields["bug_severity"] = "major" - elif self.fields.has_key("Synopsis"): - if re.search("crash|assert", self.fields["Synopsis"]): - self.fields["bug_severity"] = "critical" - elif re.search("wrong|error", self.fields["Synopsis"]): - self.fields["bug_severity"] = "major" - self.fields["bug_severity"] = SqlQuote(self.fields["bug_severity"]) - self.fields["keywords"] = 0 - if keywordids.has_key(self.fields["Class"]): - self.fields["keywords"] = self.fields["Class"] - self.fields["keywordid"] = keywordids[self.fields["Class"]] - self.fields["keywords"] = SqlQuote(self.fields["keywords"]) - self.fields["priority"] = "P1" - if self.fields.has_key("Severity") and self.fields.has_key("Priority"): - severity = self.fields["Severity"] - priority = self.fields["Priority"] - if severity == "critical": - if priority == "high": - self.fields["priority"] = "Highest" - else: - self.fields["priority"] = "High" - elif severity == "serious": - if priority == "low": - self.fields["priority"] = "Low" - else: - self.fields["priority"] = "Normal" - else: - if priority == "high": - self.fields["priority"] = "Low" - else: - self.fields["priority"] = "Lowest" - self.fields["priority"] = SqlQuote(self.fields["priority"]) - state = self.fields["State"] - if (state == "open" or state == "analyzed") and self.fields["userid"] != 3: - self.fields["bug_status"] = "ASSIGNED" - self.fields["resolution"] = "" - elif state == "feedback": - self.fields["bug_status"] = "WAITING" - self.fields["resolution"] = "" - elif state == "closed": - self.fields["bug_status"] = "CLOSED" - if self.fields.has_key("Class"): - theclass = self.fields["Class"] - if theclass.find("duplicate") != -1: - self.fields["resolution"]="DUPLICATE" - elif theclass.find("mistaken") != -1: - self.fields["resolution"]="INVALID" - else: - self.fields["resolution"]="FIXED" - else: - self.fields["resolution"]="FIXED" - elif state == "suspended": - self.fields["bug_status"] = "SUSPENDED" - self.fields["resolution"] = "" - elif state == "analyzed" and self.fields["userid"] == 3: - self.fields["bug_status"] = "NEW" - self.fields["resolution"] = "" - else: - self.fields["bug_status"] = "UNCONFIRMED" - self.fields["resolution"] = "" - self.fields["bug_status"] = SqlQuote(self.fields["bug_status"]) - self.fields["resolution"] = SqlQuote(self.fields["resolution"]) - self.fields["creation_ts"] = "" - if self.fields.has_key("Arrival-Date") and self.fields["Arrival-Date"] != "": - self.fields["creation_ts"] = unixdate2datetime(self.fields["Arrival-Date"]) - self.fields["creation_ts"] = SqlQuote(self.fields["creation_ts"]) - self.fields["delta_ts"] = "" - if self.fields.has_key("Audit-Trail"): - result = lastdatere.findall(self.fields["Audit-Trail"]) - result.reverse() - if len(result) > 0: - self.fields["delta_ts"] = unixdate2timestamp(result[0]) - if self.fields["delta_ts"] == "": - if self.fields.has_key("Arrival-Date") and self.fields["Arrival-Date"] != "": - self.fields["delta_ts"] = unixdate2timestamp(self.fields["Arrival-Date"]) - self.fields["delta_ts"] = SqlQuote(self.fields["delta_ts"]) - self.fields["short_desc"] = SqlQuote(self.fields["Synopsis"]) - if self.fields.has_key("Reply-To") and self.fields["Reply-To"] != "": - self.fields["reporter"] = get_userid(self.fields["Reply-To"]) - elif self.fields.has_key("Mail-Header"): - result = re.search(r"""From .*?([\w.]+@[\w.]+)""", self.fields["Mail-Header"]) - if result: - self.fields["reporter"] = get_userid(result.group(1)) - else: - self.fields["reporter"] = get_userid(gnats_username) - else: - self.fields["reporter"] = get_userid(gnats_username) - long_desc = self.fields["Description"] - long_desc2 = "" - for field in ["Release", "Environment", "How-To-Repeat"]: - if self.fields.has_key(field) and self.fields[field] != "": - long_desc += ("\n\n%s:\n" % field) + self.fields[field] - if self.fields.has_key("Fix") and self.fields["Fix"] != "": - long_desc2 = "Fix:\n" + self.fields["Fix"] - if self.need_unformatted == 1 and self.fields["Unformatted"] != "": - long_desc += "\n\nUnformatted:\n" + self.fields["Unformatted"] - if long_desc != "": - self.long_descs.append((self.bug_id, self.fields["reporter"], - self.fields["creation_ts"], - SqlQuote(long_desc))) - if long_desc2 != "": - self.long_descs.append((self.bug_id, self.fields["reporter"], - self.fields["creation_ts"], - SqlQuote(long_desc2))) - for field in ["gcchost", "gccbuild", "gcctarget"]: - self.fields[field] = SqlQuote(self.fields[field]) - self.fields["version"] = "" - if self.fields["Release"] != "": - self.fields["version"] = convert_gccver_to_ver (self.fields["Release"]) - self.fields["version"] = SqlQuote(self.fields["version"]) - self.fields["product"] = SqlQuote("gcc") - self.fields["component"] = "invalid" - if self.fields.has_key("Category"): - self.fields["component"] = self.fields["Category"] - self.fields["component"] = SqlQuote(self.fields["component"]) - self.fields["target_milestone"] = "---" - if self.fields["version"].find("3.4") != -1: - self.fields["target_milestone"] = "3.4" - self.fields["target_milestone"] = SqlQuote(self.fields["target_milestone"]) - if self.fields["userid"] == 2: - self.fields["userid"] = "\'NULL\'" - -class GNATSbug(object): - """ Represents a single GNATS PR """ - def __init__(self, filename): - self.attachments = [] - self.has_unformatted_attach = 0 - fp = open (filename) - self.fields = self.parse_pr(fp.xreadlines()) - self.bug_id = int(self.fields["Number"]) - if self.fields.has_key("Unformatted"): - self.find_gnatsweb_attachments() - if self.fields.has_key("How-To-Repeat"): - self.find_regular_attachments("How-To-Repeat") - if self.fields.has_key("Fix"): - self.find_regular_attachments("Fix") - - def get_attacher(fields): - if fields.has_key("Reply-To") and fields["Reply-To"] != "": - return get_userid(fields["Reply-To"]) - else: - result = None - if fields.has_key("Mail-Header"): - result = re.search(r"""From .*?([\w.]+\@[\w.]+)""", - fields["Mail-Header"]) - if result is not None: - reporter = get_userid(result.group(1)) - else: - reporter = get_userid(gnats_username) - get_attacher = staticmethod(get_attacher) - def find_regular_attachments(self, which): - fields = self.fields - while re.search("^begin [0-7]{3}", fields[which], - re.DOTALL | re.MULTILINE): - outfp = cStringIO.StringIO() - infp = cStringIO.StringIO(fields[which]) - filename, start, end = specialuu.decode(infp, outfp, quiet=0) - fields[which]=fields[which].replace(fields[which][start:end], - "See attachments for %s\n" % filename) - self.attachments.append((filename, outfp.getvalue(), - self.get_attacher(fields))) - - def decode_gnatsweb_attachment(self, attachment): - result = re.split(r"""\n\n""", attachment, 1) - if len(result) == 1: - return -1 - envelope, body = result - envelope = uselessre.split(envelope) - envelope.pop(0) - # Turn the list of key, value into a dict of key => value - attachinfo = dict([(envelope[i], envelope[i+1]) for i in xrange(0,len(envelope),2)]) - for x in attachinfo.keys(): - attachinfo[x] = attachinfo[x].rstrip() - if (attachinfo.has_key("Content-Type") == 0) or \ - (attachinfo.has_key("Content-Disposition") == 0): - raise ValueError, "Unable to parse file attachment" - result = dispositionre.search(attachinfo["Content-Disposition"]) - filename = result.group(2) - filename = re.sub(".*/","", filename) - filename = re.sub(".*\\\\","", filename) - attachinfo["filename"]=filename - result = re.search("""(\S+);.*""", attachinfo["Content-Type"]) - if result is not None: - attachinfo["Content-Type"] = result.group(1) - if attachinfo.has_key("Content-Transfer-Encoding"): - if attachinfo["Content-Transfer-Encoding"] == "base64": - attachinfo["data"] = base64.decodestring(body) - else: - attachinfo["data"]=body - - return (attachinfo["filename"], attachinfo["data"], - self.get_attacher(self.fields)) - - def find_gnatsweb_attachments(self): - fields = self.fields - attachments = re.split(attachment_delimiter, fields["Unformatted"]) - fields["Unformatted"] = attachments.pop(0) - for attachment in attachments: - result = self.decode_gnatsweb_attachment (attachment) - if result != -1: - self.attachments.append(result) - self.has_unformatted_attach = 1 - def parse_pr(lines): - #fields = {"envelope":[]} - fields = {"envelope":array.array("c")} - hdrmulti = "envelope" - for line in lines: - line = line.rstrip('\n') - line += '\n' - result = gnatfieldre.search(line) - if result is None: - if hdrmulti != "": - if fields.has_key(hdrmulti): - #fields[hdrmulti].append(line) - fields[hdrmulti].fromstring(line) - else: - #fields[hdrmulti] = [line] - fields[hdrmulti] = array.array("c", line) - continue - hdr, arg = result.groups() - ghdr = "*not valid*" - result = fieldnamere.search(hdr) - if result != None: - ghdr = result.groups()[0] - if ghdr in fieldnames: - if multilinefields.has_key(ghdr): - hdrmulti = ghdr - #fields[ghdr] = [""] - fields[ghdr] = array.array("c") - else: - hdrmulti = "" - #fields[ghdr] = [arg] - fields[ghdr] = array.array("c", arg) - elif hdrmulti != "": - #fields[hdrmulti].append(line) - fields[hdrmulti].fromstring(line) - if hdrmulti == "envelope" and \ - (hdr == "Reply-To" or hdr == "From" \ - or hdr == "X-GNATS-Notify"): - arg = fix_email_addrs(arg) - #fields[hdr] = [arg] - fields[hdr] = array.array("c", arg) - if fields.has_key("Reply-To") and len(fields["Reply-To"]) > 0: - fields["Reply-To"] = fields["Reply-To"] - else: - fields["Reply-To"] = fields["From"] - if fields.has_key("From"): - del fields["From"] - if fields.has_key("X-GNATS-Notify") == 0: - fields["X-GNATS-Notify"] = array.array("c") - #fields["X-GNATS-Notify"] = "" - for x in fields.keys(): - fields[x] = fields[x].tostring() - #fields[x] = "".join(fields[x]) - for x in fields.keys(): - if multilinefields.has_key(x): - fields[x] = fields[x].rstrip() - - return fields - parse_pr = staticmethod(parse_pr) -load_index("%s/gnats-adm/index" % gnats_db_dir) -load_categories("%s/gnats-adm/categories" % gnats_db_dir) -load_responsible("%s/gnats-adm/responsible" % gnats_db_dir) -get_userid(gnats_username) -get_userid(unassigned_username) -for x in pr_list: - print "Processing %s..." % x - a = GNATSbug ("%s/%s" % (gnats_db_dir, x)) - b = Bugzillabug(a) -write_non_bug_tables() -outfile.close() diff --git a/contrib/gnatsparse/magic.py b/contrib/gnatsparse/magic.py deleted file mode 100755 index 049a7e19b..000000000 --- a/contrib/gnatsparse/magic.py +++ /dev/null @@ -1,712 +0,0 @@ -# Found on a russian zope mailing list, and modified to fix bugs in parsing -# the magic file and string making -# -- Daniel Berlin <dberlin@dberlin.org> -import sys, struct, time, re, exceptions, pprint, stat, os, pwd, grp - -_mew = 0 - -# _magic='/tmp/magic' -# _magic='/usr/share/magic.mime' -_magic='/usr/share/magic.mime' -mime = 1 - -_ldate_adjust = lambda x: time.mktime( time.gmtime(x) ) - -BUFFER_SIZE = 1024 * 128 # 128K should be enough... - -class MagicError(exceptions.Exception): pass - -def _handle(fmt='@x',adj=None): return fmt, struct.calcsize(fmt), adj - -KnownTypes = { - # 'byte':_handle('@b'), - 'byte':_handle('@B'), - 'ubyte':_handle('@B'), - - 'string':('s',0,None), - 'pstring':_handle('p'), - -# 'short':_handle('@h'), -# 'beshort':_handle('>h'), -# 'leshort':_handle('<h'), - 'short':_handle('@H'), - 'beshort':_handle('>H'), - 'leshort':_handle('<H'), - 'ushort':_handle('@H'), - 'ubeshort':_handle('>H'), - 'uleshort':_handle('<H'), - - 'long':_handle('@l'), - 'belong':_handle('>l'), - 'lelong':_handle('<l'), - 'ulong':_handle('@L'), - 'ubelong':_handle('>L'), - 'ulelong':_handle('<L'), - - 'date':_handle('=l'), - 'bedate':_handle('>l'), - 'ledate':_handle('<l'), - 'ldate':_handle('=l',_ldate_adjust), - 'beldate':_handle('>l',_ldate_adjust), - 'leldate':_handle('<l',_ldate_adjust), -} - -_mew_cnt = 0 -def mew(x): - global _mew_cnt - if _mew : - if x=='.' : - _mew_cnt += 1 - if _mew_cnt % 64 == 0 : sys.stderr.write( '\n' ) - sys.stderr.write( '.' ) - else: - sys.stderr.write( '\b'+x ) - -def has_format(s): - n = 0 - l = None - for c in s : - if c == '%' : - if l == '%' : n -= 1 - else : n += 1 - l = c - return n - -def read_asciiz(file,size=None,pos=None): - s = [] - if pos : - mew('s') - file.seek( pos, 0 ) - mew('z') - if size is not None : - s = [file.read( size ).split('\0')[0]] - else: - while 1 : - c = file.read(1) - if (not c) or (ord(c)==0) or (c=='\n') : break - s.append (c) - mew('Z') - return ''.join(s) - -def a2i(v,base=0): - if v[-1:] in 'lL' : v = v[:-1] - return int( v, base ) - -_cmap = { - '\\' : '\\', - '0' : '\0', -} -for c in range(ord('a'),ord('z')+1) : - try : e = eval('"\\%c"' % chr(c)) - except ValueError : pass - else : _cmap[chr(c)] = e -else: - del c - del e - -def make_string(s): - return eval( '"'+s.replace('"','\\"')+'"') - -class MagicTestError(MagicError): pass - -class MagicTest: - def __init__(self,offset,mtype,test,message,line=None,level=None): - self.line, self.level = line, level - self.mtype = mtype - self.mtest = test - self.subtests = [] - self.mask = None - self.smod = None - self.nmod = None - self.offset, self.type, self.test, self.message = \ - offset,mtype,test,message - if self.mtype == 'true' : return # XXX hack to enable level skips - if test[-1:]=='\\' and test[-2:]!='\\\\' : - self.test += 'n' # looks like someone wanted EOL to match? - if mtype[:6]=='string' : - if '/' in mtype : # for strings - self.type, self.smod = \ - mtype[:mtype.find('/')], mtype[mtype.find('/')+1:] - else: - for nm in '&+-' : - if nm in mtype : # for integer-based - self.nmod, self.type, self.mask = ( - nm, - mtype[:mtype.find(nm)], - # convert mask to int, autodetect base - int( mtype[mtype.find(nm)+1:], 0 ) - ) - break - self.struct, self.size, self.cast = KnownTypes[ self.type ] - def __str__(self): - return '%s %s %s %s' % ( - self.offset, self.mtype, self.mtest, self.message - ) - def __repr__(self): - return 'MagicTest(%s,%s,%s,%s,line=%s,level=%s,subtests=\n%s%s)' % ( - `self.offset`, `self.mtype`, `self.mtest`, `self.message`, - `self.line`, `self.level`, - '\t'*self.level, pprint.pformat(self.subtests) - ) - def run(self,file): - result = '' - do_close = 0 - try: - if type(file) == type('x') : - file = open( file, 'r', BUFFER_SIZE ) - do_close = 1 -# else: -# saved_pos = file.tell() - if self.mtype != 'true' : - data = self.read(file) - last = file.tell() - else: - data = last = None - if self.check( data ) : - result = self.message+' ' - if has_format( result ) : result %= data - for test in self.subtests : - m = test.run(file) - if m is not None : result += m - return make_string( result ) - finally: - if do_close : - file.close() -# else: -# file.seek( saved_pos, 0 ) - def get_mod_and_value(self): - if self.type[-6:] == 'string' : - # "something like\tthis\n" - if self.test[0] in '=<>' : - mod, value = self.test[0], make_string( self.test[1:] ) - else: - mod, value = '=', make_string( self.test ) - else: - if self.test[0] in '=<>&^' : - mod, value = self.test[0], a2i(self.test[1:]) - elif self.test[0] == 'x': - mod = self.test[0] - value = 0 - else: - mod, value = '=', a2i(self.test) - return mod, value - def read(self,file): - mew( 's' ) - file.seek( self.offset(file), 0 ) # SEEK_SET - mew( 'r' ) - try: - data = rdata = None - # XXX self.size might be 0 here... - if self.size == 0 : - # this is an ASCIIZ string... - size = None - if self.test != '>\\0' : # magic's hack for string read... - value = self.get_mod_and_value()[1] - size = (value=='\0') and None or len(value) - rdata = data = read_asciiz( file, size=size ) - else: - rdata = file.read( self.size ) - if not rdata or (len(rdata)!=self.size) : return None - data = struct.unpack( self.struct, rdata )[0] # XXX hack?? - except: - print >>sys.stderr, self - print >>sys.stderr, '@%s struct=%s size=%d rdata=%s' % ( - self.offset, `self.struct`, self.size,`rdata`) - raise - mew( 'R' ) - if self.cast : data = self.cast( data ) - if self.mask : - try: - if self.nmod == '&' : data &= self.mask - elif self.nmod == '+' : data += self.mask - elif self.nmod == '-' : data -= self.mask - else: raise MagicTestError(self.nmod) - except: - print >>sys.stderr,'data=%s nmod=%s mask=%s' % ( - `data`, `self.nmod`, `self.mask` - ) - raise - return data - def check(self,data): - mew('.') - if self.mtype == 'true' : - return '' # not None ! - mod, value = self.get_mod_and_value() - if self.type[-6:] == 'string' : - # "something like\tthis\n" - if self.smod : - xdata = data - if 'b' in self.smod : # all blanks are optional - xdata = ''.join( data.split() ) - value = ''.join( value.split() ) - if 'c' in self.smod : # all blanks are optional - xdata = xdata.upper() - value = value.upper() - # if 'B' in self.smod : # compact blanks - ### XXX sorry, i don't understand this :-( - # data = ' '.join( data.split() ) - # if ' ' not in data : return None - else: - xdata = data - try: - if mod == '=' : result = data == value - elif mod == '<' : result = data < value - elif mod == '>' : result = data > value - elif mod == '&' : result = data & value - elif mod == '^' : result = (data & (~value)) == 0 - elif mod == 'x' : result = 1 - else : raise MagicTestError(self.test) - if result : - zdata, zval = `data`, `value` - if self.mtype[-6:]!='string' : - try: zdata, zval = hex(data), hex(value) - except: zdata, zval = `data`, `value` - if 0 : print >>sys.stderr, '%s @%s %s:%s %s %s => %s (%s)' % ( - '>'*self.level, self.offset, - zdata, self.mtype, `mod`, zval, `result`, - self.message - ) - return result - except: - print >>sys.stderr,'mtype=%s data=%s mod=%s value=%s' % ( - `self.mtype`, `data`, `mod`, `value` - ) - raise - def add(self,mt): - if not isinstance(mt,MagicTest) : - raise MagicTestError((mt,'incorrect subtest type %s'%(type(mt),))) - if mt.level == self.level+1 : - self.subtests.append( mt ) - elif self.subtests : - self.subtests[-1].add( mt ) - elif mt.level > self.level+1 : - # it's possible to get level 3 just after level 1 !!! :-( - level = self.level + 1 - while level < mt.level : - xmt = MagicTest(None,'true','x','',line=self.line,level=level) - self.add( xmt ) - level += 1 - else: - self.add( mt ) # retry... - else: - raise MagicTestError((mt,'incorrect subtest level %s'%(`mt.level`,))) - def last_test(self): - return self.subtests[-1] -#end class MagicTest - -class OffsetError(MagicError): pass - -class Offset: - pos_format = {'b':'<B','B':'>B','s':'<H','S':'>H','l':'<I','L':'>I',} - pattern0 = re.compile(r''' # mere offset - ^ - &? # possible ampersand - ( 0 # just zero - | [1-9]{1,1}[0-9]* # decimal - | 0[0-7]+ # octal - | 0x[0-9a-f]+ # hex - ) - $ - ''', re.X|re.I - ) - pattern1 = re.compile(r''' # indirect offset - ^\( - (?P<base>&?0 # just zero - |&?[1-9]{1,1}[0-9]* # decimal - |&?0[0-7]* # octal - |&?0x[0-9A-F]+ # hex - ) - (?P<type> - \. # this dot might be alone - [BSL]? # one of this chars in either case - )? - (?P<sign> - [-+]{0,1} - )? - (?P<off>0 # just zero - |[1-9]{1,1}[0-9]* # decimal - |0[0-7]* # octal - |0x[0-9a-f]+ # hex - )? - \)$''', re.X|re.I - ) - def __init__(self,s): - self.source = s - self.value = None - self.relative = 0 - self.base = self.type = self.sign = self.offs = None - m = Offset.pattern0.match( s ) - if m : # just a number - if s[0] == '&' : - self.relative, self.value = 1, int( s[1:], 0 ) - else: - self.value = int( s, 0 ) - return - m = Offset.pattern1.match( s ) - if m : # real indirect offset - try: - self.base = m.group('base') - if self.base[0] == '&' : - self.relative, self.base = 1, int( self.base[1:], 0 ) - else: - self.base = int( self.base, 0 ) - if m.group('type') : self.type = m.group('type')[1:] - self.sign = m.group('sign') - if m.group('off') : self.offs = int( m.group('off'), 0 ) - if self.sign == '-' : self.offs = 0 - self.offs - except: - print >>sys.stderr, '$$', m.groupdict() - raise - return - raise OffsetError(`s`) - def __call__(self,file=None): - if self.value is not None : return self.value - pos = file.tell() - try: - if not self.relative : file.seek( self.offset, 0 ) - frmt = Offset.pos_format.get( self.type, 'I' ) - size = struct.calcsize( frmt ) - data = struct.unpack( frmt, file.read( size ) ) - if self.offs : data += self.offs - return data - finally: - file.seek( pos, 0 ) - def __str__(self): return self.source - def __repr__(self): return 'Offset(%s)' % `self.source` -#end class Offset - -class MagicFileError(MagicError): pass - -class MagicFile: - def __init__(self,filename=_magic): - self.file = None - self.tests = [] - self.total_tests = 0 - self.load( filename ) - self.ack_tests = None - self.nak_tests = None - def __del__(self): - self.close() - def load(self,filename=None): - self.open( filename ) - self.parse() - self.close() - def open(self,filename=None): - self.close() - if filename is not None : - self.filename = filename - self.file = open( self.filename, 'r', BUFFER_SIZE ) - def close(self): - if self.file : - self.file.close() - self.file = None - def parse(self): - line_no = 0 - for line in self.file.xreadlines() : - line_no += 1 - if not line or line[0]=='#' : continue - line = line.lstrip().rstrip('\r\n') - if not line or line[0]=='#' : continue - try: - x = self.parse_line( line ) - if x is None : - print >>sys.stderr, '#[%04d]#'%line_no, line - continue - except: - print >>sys.stderr, '###[%04d]###'%line_no, line - raise - self.total_tests += 1 - level, offset, mtype, test, message = x - new_test = MagicTest(offset,mtype,test,message, - line=line_no,level=level) - try: - if level == 0 : - self.tests.append( new_test ) - else: - self.tests[-1].add( new_test ) - except: - if 1 : - print >>sys.stderr, 'total tests=%s' % ( - `self.total_tests`, - ) - print >>sys.stderr, 'level=%s' % ( - `level`, - ) - print >>sys.stderr, 'tests=%s' % ( - pprint.pformat(self.tests), - ) - raise - else: - while self.tests[-1].level > 0 : - self.tests.pop() - def parse_line(self,line): - # print >>sys.stderr, 'line=[%s]' % line - if (not line) or line[0]=='#' : return None - level = 0 - offset = mtype = test = message = '' - mask = None - # get optional level (count leading '>') - while line and line[0]=='>' : - line, level = line[1:], level+1 - # get offset - while line and not line[0].isspace() : - offset, line = offset+line[0], line[1:] - try: - offset = Offset(offset) - except: - print >>sys.stderr, 'line=[%s]' % line - raise - # skip spaces - line = line.lstrip() - # get type - c = None - while line : - last_c, c, line = c, line[0], line[1:] - if last_c!='\\' and c.isspace() : - break # unescaped space - end of field - else: - mtype += c - if last_c == '\\' : - c = None # don't fuck my brain with sequential backslashes - # skip spaces - line = line.lstrip() - # get test - c = None - while line : - last_c, c, line = c, line[0], line[1:] - if last_c!='\\' and c.isspace() : - break # unescaped space - end of field - else: - test += c - if last_c == '\\' : - c = None # don't fuck my brain with sequential backslashes - # skip spaces - line = line.lstrip() - # get message - message = line - if mime and line.find("\t") != -1: - message=line[0:line.find("\t")] - # - # print '>>', level, offset, mtype, test, message - return level, offset, mtype, test, message - def detect(self,file): - self.ack_tests = 0 - self.nak_tests = 0 - answers = [] - for test in self.tests : - message = test.run( file ) - if message : - self.ack_tests += 1 - answers.append( message ) - else: - self.nak_tests += 1 - if answers : - return '; '.join( answers ) -#end class MagicFile - -def username(uid): - try: - return pwd.getpwuid( uid )[0] - except: - return '#%s'%uid - -def groupname(gid): - try: - return grp.getgrgid( gid )[0] - except: - return '#%s'%gid - -def get_file_type(fname,follow): - t = None - if not follow : - try: - st = os.lstat( fname ) # stat that entry, don't follow links! - except os.error, why : - pass - else: - if stat.S_ISLNK(st[stat.ST_MODE]) : - t = 'symbolic link' - try: - lnk = os.readlink( fname ) - except: - t += ' (unreadable)' - else: - t += ' to '+lnk - if t is None : - try: - st = os.stat( fname ) - except os.error, why : - return "can't stat `%s' (%s)." % (why.filename,why.strerror) - - dmaj, dmin = (st.st_rdev>>8)&0x0FF, st.st_rdev&0x0FF - - if 0 : pass - elif stat.S_ISSOCK(st.st_mode) : t = 'socket' - elif stat.S_ISLNK (st.st_mode) : t = follow and 'symbolic link' or t - elif stat.S_ISREG (st.st_mode) : t = 'file' - elif stat.S_ISBLK (st.st_mode) : t = 'block special (%d/%d)'%(dmaj,dmin) - elif stat.S_ISDIR (st.st_mode) : t = 'directory' - elif stat.S_ISCHR (st.st_mode) : t = 'character special (%d/%d)'%(dmaj,dmin) - elif stat.S_ISFIFO(st.st_mode) : t = 'pipe' - else: t = '<unknown>' - - if st.st_mode & stat.S_ISUID : - t = 'setuid(%d=%s) %s'%(st.st_uid,username(st.st_uid),t) - if st.st_mode & stat.S_ISGID : - t = 'setgid(%d=%s) %s'%(st.st_gid,groupname(st.st_gid),t) - if st.st_mode & stat.S_ISVTX : - t = 'sticky '+t - - return t - -HELP = '''%s [options] [files...] - -Options: - - -?, --help -- this help - -m, --magic=<file> -- use this magic <file> instead of %s - -f, --files=<namefile> -- read filenames for <namefile> -* -C, --compile -- write "compiled" magic file - -b, --brief -- don't prepend filenames to output lines -+ -c, --check -- check the magic file - -i, --mime -- output MIME types -* -k, --keep-going -- don't stop st the first match - -n, --flush -- flush stdout after each line - -v, --verson -- print version and exit -* -z, --compressed -- try to look inside compressed files - -L, --follow -- follow symlinks - -s, --special -- don't skip special files - -* -- not implemented so far ;-) -+ -- implemented, but in another way... -''' - -def main(): - import getopt - global _magic - try: - brief = 0 - flush = 0 - follow= 0 - mime = 0 - check = 0 - special=0 - try: - opts, args = getopt.getopt( - sys.argv[1:], - '?m:f:CbciknvzLs', - ( 'help', - 'magic=', - 'names=', - 'compile', - 'brief', - 'check', - 'mime', - 'keep-going', - 'flush', - 'version', - 'compressed', - 'follow', - 'special', - ) - ) - except getopt.error, why: - print >>sys.stderr, sys.argv[0], why - return 1 - else: - files = None - for o,v in opts : - if o in ('-?','--help'): - print HELP % ( - sys.argv[0], - _magic, - ) - return 0 - elif o in ('-f','--files='): - files = v - elif o in ('-m','--magic='): - _magic = v[:] - elif o in ('-C','--compile'): - pass - elif o in ('-b','--brief'): - brief = 1 - elif o in ('-c','--check'): - check = 1 - elif o in ('-i','--mime'): - mime = 1 - if os.path.exists( _magic+'.mime' ) : - _magic += '.mime' - print >>sys.stderr,sys.argv[0]+':',\ - "Using regular magic file `%s'" % _magic - elif o in ('-k','--keep-going'): - pass - elif o in ('-n','--flush'): - flush = 1 - elif o in ('-v','--version'): - print 'VERSION' - return 0 - elif o in ('-z','--compressed'): - pass - elif o in ('-L','--follow'): - follow = 1 - elif o in ('-s','--special'): - special = 1 - else: - if files : - files = map(lambda x: x.strip(), v.split(',')) - if '-' in files and '-' in args : - error( 1, 'cannot use STDIN simultaneously for file list and data' ) - for file in files : - for name in ( - (file=='-') - and sys.stdin - or open(file,'r',BUFFER_SIZE) - ).xreadlines(): - name = name.strip() - if name not in args : - args.append( name ) - try: - if check : print >>sys.stderr, 'Loading magic database...' - t0 = time.time() - m = MagicFile(_magic) - t1 = time.time() - if check : - print >>sys.stderr, \ - m.total_tests, 'tests loaded', \ - 'for', '%.2f' % (t1-t0), 'seconds' - print >>sys.stderr, len(m.tests), 'tests at top level' - return 0 # XXX "shortened" form ;-) - - mlen = max( map(len, args) )+1 - for arg in args : - if not brief : print (arg + ':').ljust(mlen), - ftype = get_file_type( arg, follow ) - if (special and ftype.find('special')>=0) \ - or ftype[-4:] == 'file' : - t0 = time.time() - try: - t = m.detect( arg ) - except (IOError,os.error), why: - t = "can't read `%s' (%s)" % (why.filename,why.strerror) - if ftype[-4:] == 'file' : t = ftype[:-4] + t - t1 = time.time() - print t and t or 'data' - if 0 : print \ - '#\t%d tests ok, %d tests failed for %.2f seconds'%\ - (m.ack_tests, m.nak_tests, t1-t0) - else: - print mime and 'application/x-not-regular-file' or ftype - if flush : sys.stdout.flush() - # print >>sys.stderr, 'DONE' - except: - if check : return 1 - raise - else: - return 0 - finally: - pass - -if __name__ == '__main__' : - sys.exit( main() ) -# vim:ai -# EOF # diff --git a/contrib/gnatsparse/specialuu.py b/contrib/gnatsparse/specialuu.py deleted file mode 100755 index b729d9c59..000000000 --- a/contrib/gnatsparse/specialuu.py +++ /dev/null @@ -1,104 +0,0 @@ -#! /usr/bin/env python2.2 - -# Copyright 1994 by Lance Ellinghouse -# Cathedral City, California Republic, United States of America. -# All Rights Reserved -# Permission to use, copy, modify, and distribute this software and its -# documentation for any purpose and without fee is hereby granted, -# provided that the above copyright notice appear in all copies and that -# both that copyright notice and this permission notice appear in -# supporting documentation, and that the name of Lance Ellinghouse -# not be used in advertising or publicity pertaining to distribution -# of the software without specific, written prior permission. -# LANCE ELLINGHOUSE DISCLAIMS ALL WARRANTIES WITH REGARD TO -# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND -# FITNESS, IN NO EVENT SHALL LANCE ELLINGHOUSE CENTRUM BE LIABLE -# FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES -# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN -# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT -# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -# -# Modified by Jack Jansen, CWI, July 1995: -# - Use binascii module to do the actual line-by-line conversion -# between ascii and binary. This results in a 1000-fold speedup. The C -# version is still 5 times faster, though. -# - Arguments more compliant with python standard - -"""Implementation of the UUencode and UUdecode functions. - -encode(in_file, out_file [,name, mode]) -decode(in_file [, out_file, mode]) -""" - -import binascii -import os -import sys -from types import StringType - -__all__ = ["Error", "decode"] - -class Error(Exception): - pass - -def decode(in_file, out_file=None, mode=None, quiet=0): - """Decode uuencoded file""" - # - # Open the input file, if needed. - # - if in_file == '-': - in_file = sys.stdin - elif isinstance(in_file, StringType): - in_file = open(in_file) - # - # Read until a begin is encountered or we've exhausted the file - # - while 1: - hdr = in_file.readline() - if not hdr: - raise Error, 'No valid begin line found in input file' - if hdr[:5] != 'begin': - continue - hdrfields = hdr.split(" ", 2) - if len(hdrfields) == 3 and hdrfields[0] == 'begin': - try: - int(hdrfields[1], 8) - start_pos = in_file.tell() - len (hdr) - break - except ValueError: - pass - if out_file is None: - out_file = hdrfields[2].rstrip() - if os.path.exists(out_file): - raise Error, 'Cannot overwrite existing file: %s' % out_file - if mode is None: - mode = int(hdrfields[1], 8) - # - # Open the output file - # - if out_file == '-': - out_file = sys.stdout - elif isinstance(out_file, StringType): - fp = open(out_file, 'wb') - try: - os.path.chmod(out_file, mode) - except AttributeError: - pass - out_file = fp - # - # Main decoding loop - # - s = in_file.readline() - while s and s.strip() != 'end': - try: - data = binascii.a2b_uu(s) - except binascii.Error, v: - # Workaround for broken uuencoders by /Fredrik Lundh - nbytes = (((ord(s[0])-32) & 63) * 4 + 5) / 3 - data = binascii.a2b_uu(s[:nbytes]) - if not quiet: - sys.stderr.write("Warning: %s\n" % str(v)) - out_file.write(data) - s = in_file.readline() -# if not s: - # raise Error, 'Truncated input file' - return (hdrfields[2].rstrip(), start_pos, in_file.tell()) |