summaryrefslogtreecommitdiffstats
path: root/contrib/gnatsparse
diff options
context:
space:
mode:
authorMax Kanat-Alexander <mkanat@bugzilla.org>2010-02-01 22:39:54 +0100
committerMax Kanat-Alexander <mkanat@bugzilla.org>2010-02-01 22:39:54 +0100
commitd495a972854500ce323f15d024605ec395fab155 (patch)
tree841efc7d2bf92cfd90098b6a32b1d80e52c1ac4d /contrib/gnatsparse
parenta456dea4447c9ddd1e79e04b2456740de19ce112 (diff)
downloadbugzilla-d495a972854500ce323f15d024605ec395fab155.tar.gz
bugzilla-d495a972854500ce323f15d024605ec395fab155.tar.xz
Fix the data in the bzr repo to match the data in the CVS repo.
During the CVS imports into Bzr, there were some inconsistencies introduced (mostly that files that were deleted in CVS weren't being deleted in Bzr). So this checkin makes the bzr repo actually consistent with the CVS repo, including fixing permissions of files.
Diffstat (limited to 'contrib/gnatsparse')
-rwxr-xr-xcontrib/gnatsparse/README62
-rwxr-xr-xcontrib/gnatsparse/gnatsparse.py807
-rwxr-xr-xcontrib/gnatsparse/magic.py712
-rwxr-xr-xcontrib/gnatsparse/specialuu.py104
4 files changed, 0 insertions, 1685 deletions
diff --git a/contrib/gnatsparse/README b/contrib/gnatsparse/README
deleted file mode 100755
index f7cf01c68..000000000
--- a/contrib/gnatsparse/README
+++ /dev/null
@@ -1,62 +0,0 @@
-gnatsparse
-==========
-
-Author: Daniel Berlin <dan@dberlin.org>
-
-gnatsparse is a simple Python program that imports a GNATS database
-into a Bugzilla system. It is based on the gnats2bz.pl Perl script
-but it's a rewrite at the same time. Its parser is based on gnatsweb,
-which gives a 10 times speed improvement compared to the previous code.
-
-Features
---------
-
-* Chunks audit trail into separate comments, with the right From's, times, etc.
-
-* Handles followup emails that are in the report, with the right From's, times,
-etc.
-
-* Properly handles duplicates, adding the standard bugzilla duplicate message.
-
-* Extracts and handles gnatsweb attachments, as well as uuencoded attachments
-appearing in either followup emails, the how-to-repeat field, etc. Replaces
-them with a message to look at the attachments list, and adds the standard
-"Created an attachment" message that bugzilla uses. Handling them includes
-giving them the right name and mime-type. "attachments" means multiple
-uuencoded things/gnatsweb attachments are handled properly.
-
-* Handles reopened bug reports.
-
-* Builds the cc list from the people who have commented on the report,
-and the reporter.
-
-Requirements
-------------
-
-It requires python 2.2+, it won't work with 1.5.2 (Linux distributions
-ship with 2.2+ these days, so that shouldn't be an issue).
-
-Documentation
--------------
-
-Documentation can be found inside the scripts. The source code is self
-documenting.
-
-Issues for someone trying to use it to convert a gnats install
------------------------------------
-
-1. We have three custom fields bugzilla doesn't ship with,
-gcchost, gcctarget, and gccbuild.
-We removed two bugzilla fields, rep_platform and op_sys.
-If you use the latter instead of the former, you'll need to
-update the script to account for this.
-2. Because gcc attachments consist of preprocessed source, all attachments
-inserted into the attachment database are compressed with zlib.compress.
-This requires associated bugzilla changes to decompress before sending to
-the browser.
-Unless you want to make those changes (it's roughly 3 lines), you'll
-need to remove the zlib.compress call.
-3. You will need to come up with your own release to version mapping and
-install it.
-4. Obviously, any extra gnats fields you have added will have to
-be handled in some manner.
diff --git a/contrib/gnatsparse/gnatsparse.py b/contrib/gnatsparse/gnatsparse.py
deleted file mode 100755
index 9eef18b89..000000000
--- a/contrib/gnatsparse/gnatsparse.py
+++ /dev/null
@@ -1,807 +0,0 @@
-try:
-# Using Psyco makes it about 25% faster, but there's a bug in psyco in
-# handling of eval causing it to use unlimited memory with the magic
-# file enabled.
-# import psyco
-# psyco.full()
-# from psyco.classes import *
- pass
-except:
- pass
-import re
-import base64
-import cStringIO
-import specialuu
-import array
-import email.Utils
-import zlib
-import magic
-
-# Comment out if you don't want magic detection
-magicf = magic.MagicFile()
-
-# Open our output file
-outfile = open("gnats2bz_data.sql", "w")
-
-# List of GNATS fields
-fieldnames = ("Number", "Category", "Synopsis", "Confidential", "Severity",
- "Priority", "Responsible", "State", "Quarter", "Keywords",
- "Date-Required", "Class", "Submitter-Id", "Arrival-Date",
- "Closed-Date", "Last-Modified", "Originator", "Release",
- "Organization", "Environment", "Description", "How-To-Repeat",
- "Fix", "Release-Note", "Audit-Trail", "Unformatted")
-
-# Dictionary telling us which GNATS fields are multiline
-multilinefields = {"Organization":1, "Environment":1, "Description":1,
- "How-To-Repeat":1, "Fix":1, "Release-Note":1,
- "Audit-Trail":1, "Unformatted":1}
-
-# Mapping of GCC release to version. Our version string is updated every
-# so we need to funnel all release's with 3.4 in the string to be version
-# 3.4 for bug tracking purposes
-# The key is a regex to match, the value is the version it corresponds
-# with
-releasetovermap = {r"3\.4":"3.4", r"3\.3":"3.3", r"3\.2\.2":"3.2.2",
- r"3\.2\.1":"3.2.1", r"3\.2":"3.2", r"3\.1\.2":"3.1.2",
- r"3\.1\.1":"3.1.1", r"3\.1":"3.1", r"3\.0\.4":"3.0.4",
- r"3\.0\.3":"3.0.3", r"3\.0\.2":"3.0.2", r"3\.0\.1":"3.0.1",
- r"3\.0":"3.0", r"2\.95\.4":"2.95.4", r"2\.95\.3":"2.95.3",
- r"2\.95\.2":"2.95.2", r"2\.95\.1":"2.95.1",
- r"2\.95":"2.95", r"2\.97":"2.97",
- r"2\.96.*[rR][eE][dD].*[hH][aA][tT]":"2.96 (redhat)",
- r"2\.96":"2.96"}
-
-# These map the field name to the field id bugzilla assigns. We need
-# the id when doing bug activity.
-fieldids = {"State":8, "Responsible":15}
-
-# These are the keywords we use in gcc bug tracking. They are transformed
-# into bugzilla keywords. The format here is <keyword>-><bugzilla keyword id>
-keywordids = {"wrong-code":1, "ice-on-legal-code":2, "ice-on-illegal-code":3,
- "rejects-legal":4, "accepts-illegal":5, "pessimizes-code":6}
-
-# Map from GNATS states to Bugzilla states. Duplicates and reopened bugs
-# are handled when parsing the audit trail, so no need for them here.
-state_lookup = {"":"NEW", "open":"ASSIGNED", "analyzed":"ASSIGNED",
- "feedback":"WAITING", "closed":"CLOSED",
- "suspended":"SUSPENDED"}
-
-# Table of versions that exist in the bugs, built up as we go along
-versions_table = {}
-
-# Delimiter gnatsweb uses for attachments
-attachment_delimiter = "----gnatsweb-attachment----\n"
-
-# Here starts the various regular expressions we use
-# Matches an entire GNATS single line field
-gnatfieldre = re.compile(r"""^([>\w\-]+)\s*:\s*(.*)\s*$""")
-
-# Matches the name of a GNATS field
-fieldnamere = re.compile(r"""^>(.*)$""")
-
-# Matches the useless part of an envelope
-uselessre = re.compile(r"""^(\S*?):\s*""", re.MULTILINE)
-
-# Matches the filename in a content disposition
-dispositionre = re.compile("(\\S+);\\s*filename=\"([^\"]+)\"")
-
-# Matches the last changed date in the entire text of a bug
-# If you have other editable fields that get audit trail entries, modify this
-# The field names are explicitly listed in order to speed up matching
-lastdatere = re.compile(r"""^(?:(?:State|Responsible|Priority|Severity)-Changed-When: )(.+?)$""", re.MULTILINE)
-
-# Matches the From line of an email or the first line of an audit trail entry
-# We use this re to find the begin lines of all the audit trail entries
-# The field names are explicitly listed in order to speed up matching
-fromtore=re.compile(r"""^(?:(?:State|Responsible|Priority|Severity)-Changed-From-To: |From: )""", re.MULTILINE)
-
-# These re's match the various parts of an audit trail entry
-changedfromtore=re.compile(r"""^(\w+?)-Changed-From-To: (.+?)$""", re.MULTILINE)
-changedbyre=re.compile(r"""^\w+?-Changed-By: (.+?)$""", re.MULTILINE)
-changedwhenre=re.compile(r"""^\w+?-Changed-When: (.+?)$""", re.MULTILINE)
-changedwhyre=re.compile(r"""^\w+?-Changed-Why:\s*(.*?)$""", re.MULTILINE)
-
-# This re matches audit trail text saying that the current bug is a duplicate of another
-duplicatere=re.compile(r"""(?:")?Dup(?:licate)?(?:d)?(?:")? of .*?(\d+)""", re.IGNORECASE | re.MULTILINE)
-
-# Get the text of a From: line
-fromre=re.compile(r"""^From: (.*?)$""", re.MULTILINE)
-
-# Get the text of a Date: Line
-datere=re.compile(r"""^Date: (.*?)$""", re.MULTILINE)
-
-# Map of the responsible file to email addresses
-responsible_map = {}
-# List of records in the responsible file
-responsible_list = []
-# List of records in the categories file
-categories_list = []
-# List of pr's in the index
-pr_list = []
-# Map usernames to user ids
-usermapping = {}
-# Start with this user id
-userid_base = 2
-
-# Name of gnats user
-gnats_username = "gnats@gcc.gnu.org"
-# Name of unassigned user
-unassigned_username = "unassigned@gcc.gnu.org"
-
-gnats_db_dir = "."
-product = "gcc"
-productdesc = "GNU Compiler Connection"
-milestoneurl = "http://gcc/gnu.org"
-defaultmilestone = "3.4"
-
-def write_non_bug_tables():
- """ Write out the non-bug related tables, such as products, profiles, etc."""
- # Set all non-unconfirmed bugs's everconfirmed flag
- print >>outfile, "update bugs set everconfirmed=1 where bug_status != 'UNCONFIRMED';"
-
- # Set all bugs assigned to the unassigned user to NEW
- print >>outfile, "update bugs set bug_status='NEW',assigned_to='NULL' where bug_status='ASSIGNED' AND assigned_to=3;"
-
- # Insert the products
- print >>outfile, "\ninsert into products ("
- print >>outfile, " product, description, milestoneurl, isactive,"
- print >>outfile, " defaultmilestone, votestoconfirm) values ("
- print >>outfile, " '%s', '%s', '%s', 1, '%s', 1);" % (product,
- productdesc,
- milestoneurl,
- defaultmilestone)
-
- # Insert the components
- for category in categories_list:
- component = SqlQuote(category[0])
- productstr = SqlQuote(product)
- description = SqlQuote(category[1])
- initialowner = SqlQuote("3")
- print >>outfile, "\ninsert into components (";
- print >>outfile, " value, program, initialowner, initialqacontact,"
- print >>outfile, " description) values ("
- print >>outfile, " %s, %s, %s, '', %s);" % (component, productstr,
- initialowner, description)
-
- # Insert the versions
- for productstr, version_list in versions_table.items():
- productstr = SqlQuote(productstr)
- for version in version_list:
- version = SqlQuote(version)
- print >>outfile, "\ninsert into versions (value, program) "
- print >>outfile, " values (%s, %s);" % (version, productstr)
-
- # Insert the users
- for username, userid in usermapping.items():
- realname = map_username_to_realname(username)
- username = SqlQuote(username)
- realname = SqlQuote(realname)
- print >>outfile, "\ninsert into profiles ("
- print >>outfile, " userid, login_name, password, cryptpassword, realname, groupset"
- print >>outfile, ") values ("
- print >>outfile, "%s,%s,'password',encrypt('password'), %s, 0);" % (userid, username, realname)
- print >>outfile, "update profiles set groupset=1 << 32 where login_name like '%\@gcc.gnu.org';"
-
-def unixdate2datetime(unixdate):
- """ Convert a unix date to a datetime value """
- year, month, day, hour, min, sec, x, x, x, x = email.Utils.parsedate_tz(unixdate)
- return "%d-%02d-%02d %02d:%02d:%02d" % (year,month,day,hour,min,sec)
-
-def unixdate2timestamp(unixdate):
- """ Convert a unix date to a timestamp value """
- year, month, day, hour, min, sec, x, x, x, x = email.Utils.parsedate_tz(unixdate)
- return "%d%02d%02d%02d%02d%02d" % (year,month,day,hour,min,sec)
-
-def SqlQuote(str):
- """ Perform SQL quoting on a string """
- return "'%s'" % str.replace("'", """''""").replace("\\", "\\\\").replace("\0","\\0")
-
-def convert_gccver_to_ver(gccver):
- """ Given a gcc version, convert it to a Bugzilla version. """
- for k in releasetovermap.keys():
- if re.search(".*%s.*" % k, gccver) is not None:
- return releasetovermap[k]
- result = re.search(r""".*(\d\.\d) \d+ \(experimental\).*""", gccver)
- if result is not None:
- return result.group(1)
- return "unknown"
-
-def load_index(fname):
- """ Load in the GNATS index file """
- global pr_list
- ifp = open(fname)
- for record in ifp.xreadlines():
- fields = record.split("|")
- pr_list.append(fields[0])
- ifp.close()
-
-def load_categories(fname):
- """ Load in the GNATS categories file """
- global categories_list
- cfp = open(fname)
- for record in cfp.xreadlines():
- if re.search("^#", record) is not None:
- continue
- categories_list.append(record.split(":"))
- cfp.close()
-
-def map_username_to_realname(username):
- """ Given a username, find the real name """
- name = username
- name = re.sub("@.*", "", name)
- for responsible_record in responsible_list:
- if responsible_record[0] == name:
- return responsible_record[1]
- if len(responsible_record) > 2:
- if responsible_record[2] == username:
- return responsible_record[1]
- return ""
-
-
-def get_userid(responsible):
- """ Given an email address, get the user id """
- global responsible_map
- global usermapping
- global userid_base
- if responsible is None:
- return -1
- responsible = responsible.lower()
- responsible = re.sub("sources.redhat.com", "gcc.gnu.org", responsible)
- if responsible_map.has_key(responsible):
- responsible = responsible_map[responsible]
- if usermapping.has_key(responsible):
- return usermapping[responsible]
- else:
- usermapping[responsible] = userid_base
- userid_base += 1
- return usermapping[responsible]
-
-def load_responsible(fname):
- """ Load in the GNATS responsible file """
- global responsible_map
- global responsible_list
- rfp = open(fname)
- for record in rfp.xreadlines():
- if re.search("^#", record) is not None:
- continue
- split_record = record.split(":")
- responsible_map[split_record[0]] = split_record[2].rstrip()
- responsible_list.append(record.split(":"))
- rfp.close()
-
-def split_csl(list):
- """ Split a comma separated list """
- newlist = re.split(r"""\s*,\s*""", list)
- return newlist
-
-def fix_email_addrs(addrs):
- """ Perform various fixups and cleaning on an e-mail address """
- addrs = split_csl(addrs)
- trimmed_addrs = []
- for addr in addrs:
- addr = re.sub(r"""\(.*\)""","",addr)
- addr = re.sub(r""".*<(.*)>.*""","\\1",addr)
- addr = addr.rstrip()
- addr = addr.lstrip()
- trimmed_addrs.append(addr)
- addrs = ", ".join(trimmed_addrs)
- return addrs
-
-class Bugzillabug(object):
- """ Class representing a bugzilla bug """
- def __init__(self, gbug):
- """ Initialize a bugzilla bug from a GNATS bug. """
- self.bug_id = gbug.bug_id
- self.long_descs = []
- self.bug_ccs = [get_userid("gcc-bugs@gcc.gnu.org")]
- self.bug_activity = []
- self.attachments = gbug.attachments
- self.gnatsfields = gbug.fields
- self.need_unformatted = gbug.has_unformatted_attach == 0
- self.need_unformatted &= gbug.fields.has_key("Unformatted")
- self.translate_pr()
- self.update_versions()
- if self.fields.has_key("Audit-Trail"):
- self.parse_audit_trail()
- self.write_bug()
-
- def parse_fromto(type, string):
- """ Parses the from and to parts of a changed-from-to line """
- fromstr = ""
- tostr = ""
-
- # Some slightly messed up changed lines have unassigned-new,
- # instead of unassigned->new. So we make the > optional.
- result = re.search(r"""(.*)-(?:>?)(.*)""", string)
-
- # Only know how to handle parsing of State and Responsible
- # changed-from-to right now
- if type == "State":
- fromstr = state_lookup[result.group(1)]
- tostr = state_lookup[result.group(2)]
- elif type == "Responsible":
- if result.group(1) != "":
- fromstr = result.group(1)
- if result.group(2) != "":
- tostr = result.group(2)
- if responsible_map.has_key(fromstr):
- fromstr = responsible_map[fromstr]
- if responsible_map.has_key(tostr):
- tostr = responsible_map[tostr]
- return (fromstr, tostr)
- parse_fromto = staticmethod(parse_fromto)
-
- def parse_audit_trail(self):
- """ Parse a GNATS audit trail """
- trail = self.fields["Audit-Trail"]
- # Begin to split the audit trail into pieces
- result = fromtore.finditer(trail)
- starts = []
- ends = []
- pieces = []
- # Make a list of the pieces
- for x in result:
- pieces.append (x)
- # Find the start and end of each piece
- if len(pieces) > 0:
- for x in xrange(len(pieces)-1):
- starts.append(pieces[x].start())
- ends.append(pieces[x+1].start())
- starts.append(pieces[-1].start())
- ends.append(len(trail))
- pieces = []
- # Now make the list of actual text of the pieces
- for x in xrange(len(starts)):
- pieces.append(trail[starts[x]:ends[x]])
- # And parse the actual pieces
- for piece in pieces:
- result = changedfromtore.search(piece)
- # See what things we actually have inside this entry, and
- # handle them appropriately
- if result is not None:
- type = result.group(1)
- changedfromto = result.group(2)
- # If the bug was reopened, mark it as such
- if changedfromto.find("closed->analyzed") != -1:
- if self.fields["bug_status"] == "'NEW'":
- self.fields["bug_status"] = "'REOPENED'"
- if type == "State" or type == "Responsible":
- oldstate, newstate = self.parse_fromto (type, changedfromto)
- result = changedbyre.search(piece)
- if result is not None:
- changedby = result.group(1)
- result = changedwhenre.search(piece)
- if result is not None:
- changedwhen = result.group(1)
- changedwhen = unixdate2datetime(changedwhen)
- changedwhen = SqlQuote(changedwhen)
- result = changedwhyre.search(piece)
- changedwhy = piece[result.start(1):]
- #changedwhy = changedwhy.lstrip()
- changedwhy = changedwhy.rstrip()
- changedby = get_userid(changedby)
- # Put us on the cc list if we aren't there already
- if changedby != self.fields["userid"] \
- and changedby not in self.bug_ccs:
- self.bug_ccs.append(changedby)
- # If it's a duplicate, mark it as such
- result = duplicatere.search(changedwhy)
- if result is not None:
- newtext = "*** This bug has been marked as a duplicate of %s ***" % result.group(1)
- newtext = SqlQuote(newtext)
- self.long_descs.append((self.bug_id, changedby,
- changedwhen, newtext))
- self.fields["bug_status"] = "'RESOLVED'"
- self.fields["resolution"] = "'DUPLICATE'"
- self.fields["userid"] = changedby
- else:
- newtext = "%s-Changed-From-To: %s\n%s-Changed-Why: %s\n" % (type, changedfromto, type, changedwhy)
- newtext = SqlQuote(newtext)
- self.long_descs.append((self.bug_id, changedby,
- changedwhen, newtext))
- if type == "State" or type == "Responsible":
- newstate = SqlQuote("%s" % newstate)
- oldstate = SqlQuote("%s" % oldstate)
- fieldid = fieldids[type]
- self.bug_activity.append((newstate, oldstate, fieldid, changedby, changedwhen))
-
- else:
- # It's an email
- result = fromre.search(piece)
- if result is None:
- continue
- fromstr = result.group(1)
- fromstr = fix_email_addrs(fromstr)
- fromstr = get_userid(fromstr)
- result = datere.search(piece)
- if result is None:
- continue
- datestr = result.group(1)
- datestr = SqlQuote(unixdate2timestamp(datestr))
- if fromstr != self.fields["userid"] \
- and fromstr not in self.bug_ccs:
- self.bug_ccs.append(fromstr)
- self.long_descs.append((self.bug_id, fromstr, datestr,
- SqlQuote(piece)))
-
-
-
- def write_bug(self):
- """ Output a bug to the data file """
- fields = self.fields
- print >>outfile, "\ninsert into bugs("
- print >>outfile, " bug_id, assigned_to, bug_severity, priority, bug_status, creation_ts, delta_ts,"
- print >>outfile, " short_desc,"
- print >>outfile, " reporter, version,"
- print >>outfile, " product, component, resolution, target_milestone, qa_contact,"
- print >>outfile, " gccbuild, gcctarget, gcchost, keywords"
- print >>outfile, " ) values ("
- print >>outfile, "%s, %s, %s, %s, %s, %s, %s," % (self.bug_id, fields["userid"], fields["bug_severity"], fields["priority"], fields["bug_status"], fields["creation_ts"], fields["delta_ts"])
- print >>outfile, "%s," % (fields["short_desc"])
- print >>outfile, "%s, %s," % (fields["reporter"], fields["version"])
- print >>outfile, "%s, %s, %s, %s, 0," %(fields["product"], fields["component"], fields["resolution"], fields["target_milestone"])
- print >>outfile, "%s, %s, %s, %s" % (fields["gccbuild"], fields["gcctarget"], fields["gcchost"], fields["keywords"])
- print >>outfile, ");"
- if self.fields["keywords"] != 0:
- print >>outfile, "\ninsert into keywords (bug_id, keywordid) values ("
- print >>outfile, " %s, %s);" % (self.bug_id, fields["keywordid"])
- for id, who, when, text in self.long_descs:
- print >>outfile, "\ninsert into longdescs ("
- print >>outfile, " bug_id, who, bug_when, thetext) values("
- print >>outfile, " %s, %s, %s, %s);" % (id, who, when, text)
- for name, data, who in self.attachments:
- print >>outfile, "\ninsert into attachments ("
- print >>outfile, " bug_id, filename, description, mimetype, ispatch, submitter_id) values ("
- ftype = None
- # It's *magic*!
- if name.endswith(".ii") == 1:
- ftype = "text/x-c++"
- elif name.endswith(".i") == 1:
- ftype = "text/x-c"
- else:
- ftype = magicf.detect(cStringIO.StringIO(data))
- if ftype is None:
- ftype = "application/octet-stream"
-
- print >>outfile, "%s,%s,%s, %s,0, %s,%s);" %(self.bug_id, SqlQuote(name), SqlQuote(name), SqlQuote (ftype), who)
- print >>outfile, "\ninsert into attach_data ("
- print >>outfile, "\n(id, thedata) values (last_insert_id(),"
- print >>outfile, "%s);" % (SqlQuote(zlib.compress(data)))
- for newstate, oldstate, fieldid, changedby, changedwhen in self.bug_activity:
- print >>outfile, "\ninsert into bugs_activity ("
- print >>outfile, " bug_id, who, bug_when, fieldid, added, removed) values ("
- print >>outfile, " %s, %s, %s, %s, %s, %s);" % (self.bug_id,
- changedby,
- changedwhen,
- fieldid,
- newstate,
- oldstate)
- for cc in self.bug_ccs:
- print >>outfile, "\ninsert into cc(bug_id, who) values (%s, %s);" %(self.bug_id, cc)
- def update_versions(self):
- """ Update the versions table to account for the version on this bug """
- global versions_table
- if self.fields.has_key("Release") == 0 \
- or self.fields.has_key("Category") == 0:
- return
- curr_product = "gcc"
- curr_version = self.fields["Release"]
- if curr_version == "":
- return
- curr_version = convert_gccver_to_ver (curr_version)
- if versions_table.has_key(curr_product) == 0:
- versions_table[curr_product] = []
- for version in versions_table[curr_product]:
- if version == curr_version:
- return
- versions_table[curr_product].append(curr_version)
- def translate_pr(self):
- """ Transform a GNATS PR into a Bugzilla bug """
- self.fields = self.gnatsfields
- if (self.fields.has_key("Organization") == 0) \
- or self.fields["Organization"].find("GCC"):
- self.fields["Originator"] = ""
- self.fields["Organization"] = ""
- self.fields["Organization"].lstrip()
- if (self.fields.has_key("Release") == 0) \
- or self.fields["Release"] == "" \
- or self.fields["Release"].find("unknown-1.0") != -1:
- self.fields["Release"]="unknown"
- if self.fields.has_key("Responsible"):
- result = re.search(r"""\w+""", self.fields["Responsible"])
- self.fields["Responsible"] = "%s%s" % (result.group(0), "@gcc.gnu.org")
- self.fields["gcchost"] = ""
- self.fields["gcctarget"] = ""
- self.fields["gccbuild"] = ""
- if self.fields.has_key("Environment"):
- result = re.search("^host: (.+?)$", self.fields["Environment"],
- re.MULTILINE)
- if result is not None:
- self.fields["gcchost"] = result.group(1)
- result = re.search("^target: (.+?)$", self.fields["Environment"],
- re.MULTILINE)
- if result is not None:
- self.fields["gcctarget"] = result.group(1)
- result = re.search("^build: (.+?)$", self.fields["Environment"],
- re.MULTILINE)
- if result is not None:
- self.fields["gccbuild"] = result.group(1)
- self.fields["userid"] = get_userid(self.fields["Responsible"])
- self.fields["bug_severity"] = "normal"
- if self.fields["Class"] == "change-request":
- self.fields["bug_severity"] = "enhancement"
- elif self.fields.has_key("Severity"):
- if self.fields["Severity"] == "critical":
- self.fields["bug_severity"] = "critical"
- elif self.fields["Severity"] == "serious":
- self.fields["bug_severity"] = "major"
- elif self.fields.has_key("Synopsis"):
- if re.search("crash|assert", self.fields["Synopsis"]):
- self.fields["bug_severity"] = "critical"
- elif re.search("wrong|error", self.fields["Synopsis"]):
- self.fields["bug_severity"] = "major"
- self.fields["bug_severity"] = SqlQuote(self.fields["bug_severity"])
- self.fields["keywords"] = 0
- if keywordids.has_key(self.fields["Class"]):
- self.fields["keywords"] = self.fields["Class"]
- self.fields["keywordid"] = keywordids[self.fields["Class"]]
- self.fields["keywords"] = SqlQuote(self.fields["keywords"])
- self.fields["priority"] = "P1"
- if self.fields.has_key("Severity") and self.fields.has_key("Priority"):
- severity = self.fields["Severity"]
- priority = self.fields["Priority"]
- if severity == "critical":
- if priority == "high":
- self.fields["priority"] = "Highest"
- else:
- self.fields["priority"] = "High"
- elif severity == "serious":
- if priority == "low":
- self.fields["priority"] = "Low"
- else:
- self.fields["priority"] = "Normal"
- else:
- if priority == "high":
- self.fields["priority"] = "Low"
- else:
- self.fields["priority"] = "Lowest"
- self.fields["priority"] = SqlQuote(self.fields["priority"])
- state = self.fields["State"]
- if (state == "open" or state == "analyzed") and self.fields["userid"] != 3:
- self.fields["bug_status"] = "ASSIGNED"
- self.fields["resolution"] = ""
- elif state == "feedback":
- self.fields["bug_status"] = "WAITING"
- self.fields["resolution"] = ""
- elif state == "closed":
- self.fields["bug_status"] = "CLOSED"
- if self.fields.has_key("Class"):
- theclass = self.fields["Class"]
- if theclass.find("duplicate") != -1:
- self.fields["resolution"]="DUPLICATE"
- elif theclass.find("mistaken") != -1:
- self.fields["resolution"]="INVALID"
- else:
- self.fields["resolution"]="FIXED"
- else:
- self.fields["resolution"]="FIXED"
- elif state == "suspended":
- self.fields["bug_status"] = "SUSPENDED"
- self.fields["resolution"] = ""
- elif state == "analyzed" and self.fields["userid"] == 3:
- self.fields["bug_status"] = "NEW"
- self.fields["resolution"] = ""
- else:
- self.fields["bug_status"] = "UNCONFIRMED"
- self.fields["resolution"] = ""
- self.fields["bug_status"] = SqlQuote(self.fields["bug_status"])
- self.fields["resolution"] = SqlQuote(self.fields["resolution"])
- self.fields["creation_ts"] = ""
- if self.fields.has_key("Arrival-Date") and self.fields["Arrival-Date"] != "":
- self.fields["creation_ts"] = unixdate2datetime(self.fields["Arrival-Date"])
- self.fields["creation_ts"] = SqlQuote(self.fields["creation_ts"])
- self.fields["delta_ts"] = ""
- if self.fields.has_key("Audit-Trail"):
- result = lastdatere.findall(self.fields["Audit-Trail"])
- result.reverse()
- if len(result) > 0:
- self.fields["delta_ts"] = unixdate2timestamp(result[0])
- if self.fields["delta_ts"] == "":
- if self.fields.has_key("Arrival-Date") and self.fields["Arrival-Date"] != "":
- self.fields["delta_ts"] = unixdate2timestamp(self.fields["Arrival-Date"])
- self.fields["delta_ts"] = SqlQuote(self.fields["delta_ts"])
- self.fields["short_desc"] = SqlQuote(self.fields["Synopsis"])
- if self.fields.has_key("Reply-To") and self.fields["Reply-To"] != "":
- self.fields["reporter"] = get_userid(self.fields["Reply-To"])
- elif self.fields.has_key("Mail-Header"):
- result = re.search(r"""From .*?([\w.]+@[\w.]+)""", self.fields["Mail-Header"])
- if result:
- self.fields["reporter"] = get_userid(result.group(1))
- else:
- self.fields["reporter"] = get_userid(gnats_username)
- else:
- self.fields["reporter"] = get_userid(gnats_username)
- long_desc = self.fields["Description"]
- long_desc2 = ""
- for field in ["Release", "Environment", "How-To-Repeat"]:
- if self.fields.has_key(field) and self.fields[field] != "":
- long_desc += ("\n\n%s:\n" % field) + self.fields[field]
- if self.fields.has_key("Fix") and self.fields["Fix"] != "":
- long_desc2 = "Fix:\n" + self.fields["Fix"]
- if self.need_unformatted == 1 and self.fields["Unformatted"] != "":
- long_desc += "\n\nUnformatted:\n" + self.fields["Unformatted"]
- if long_desc != "":
- self.long_descs.append((self.bug_id, self.fields["reporter"],
- self.fields["creation_ts"],
- SqlQuote(long_desc)))
- if long_desc2 != "":
- self.long_descs.append((self.bug_id, self.fields["reporter"],
- self.fields["creation_ts"],
- SqlQuote(long_desc2)))
- for field in ["gcchost", "gccbuild", "gcctarget"]:
- self.fields[field] = SqlQuote(self.fields[field])
- self.fields["version"] = ""
- if self.fields["Release"] != "":
- self.fields["version"] = convert_gccver_to_ver (self.fields["Release"])
- self.fields["version"] = SqlQuote(self.fields["version"])
- self.fields["product"] = SqlQuote("gcc")
- self.fields["component"] = "invalid"
- if self.fields.has_key("Category"):
- self.fields["component"] = self.fields["Category"]
- self.fields["component"] = SqlQuote(self.fields["component"])
- self.fields["target_milestone"] = "---"
- if self.fields["version"].find("3.4") != -1:
- self.fields["target_milestone"] = "3.4"
- self.fields["target_milestone"] = SqlQuote(self.fields["target_milestone"])
- if self.fields["userid"] == 2:
- self.fields["userid"] = "\'NULL\'"
-
-class GNATSbug(object):
- """ Represents a single GNATS PR """
- def __init__(self, filename):
- self.attachments = []
- self.has_unformatted_attach = 0
- fp = open (filename)
- self.fields = self.parse_pr(fp.xreadlines())
- self.bug_id = int(self.fields["Number"])
- if self.fields.has_key("Unformatted"):
- self.find_gnatsweb_attachments()
- if self.fields.has_key("How-To-Repeat"):
- self.find_regular_attachments("How-To-Repeat")
- if self.fields.has_key("Fix"):
- self.find_regular_attachments("Fix")
-
- def get_attacher(fields):
- if fields.has_key("Reply-To") and fields["Reply-To"] != "":
- return get_userid(fields["Reply-To"])
- else:
- result = None
- if fields.has_key("Mail-Header"):
- result = re.search(r"""From .*?([\w.]+\@[\w.]+)""",
- fields["Mail-Header"])
- if result is not None:
- reporter = get_userid(result.group(1))
- else:
- reporter = get_userid(gnats_username)
- get_attacher = staticmethod(get_attacher)
- def find_regular_attachments(self, which):
- fields = self.fields
- while re.search("^begin [0-7]{3}", fields[which],
- re.DOTALL | re.MULTILINE):
- outfp = cStringIO.StringIO()
- infp = cStringIO.StringIO(fields[which])
- filename, start, end = specialuu.decode(infp, outfp, quiet=0)
- fields[which]=fields[which].replace(fields[which][start:end],
- "See attachments for %s\n" % filename)
- self.attachments.append((filename, outfp.getvalue(),
- self.get_attacher(fields)))
-
- def decode_gnatsweb_attachment(self, attachment):
- result = re.split(r"""\n\n""", attachment, 1)
- if len(result) == 1:
- return -1
- envelope, body = result
- envelope = uselessre.split(envelope)
- envelope.pop(0)
- # Turn the list of key, value into a dict of key => value
- attachinfo = dict([(envelope[i], envelope[i+1]) for i in xrange(0,len(envelope),2)])
- for x in attachinfo.keys():
- attachinfo[x] = attachinfo[x].rstrip()
- if (attachinfo.has_key("Content-Type") == 0) or \
- (attachinfo.has_key("Content-Disposition") == 0):
- raise ValueError, "Unable to parse file attachment"
- result = dispositionre.search(attachinfo["Content-Disposition"])
- filename = result.group(2)
- filename = re.sub(".*/","", filename)
- filename = re.sub(".*\\\\","", filename)
- attachinfo["filename"]=filename
- result = re.search("""(\S+);.*""", attachinfo["Content-Type"])
- if result is not None:
- attachinfo["Content-Type"] = result.group(1)
- if attachinfo.has_key("Content-Transfer-Encoding"):
- if attachinfo["Content-Transfer-Encoding"] == "base64":
- attachinfo["data"] = base64.decodestring(body)
- else:
- attachinfo["data"]=body
-
- return (attachinfo["filename"], attachinfo["data"],
- self.get_attacher(self.fields))
-
- def find_gnatsweb_attachments(self):
- fields = self.fields
- attachments = re.split(attachment_delimiter, fields["Unformatted"])
- fields["Unformatted"] = attachments.pop(0)
- for attachment in attachments:
- result = self.decode_gnatsweb_attachment (attachment)
- if result != -1:
- self.attachments.append(result)
- self.has_unformatted_attach = 1
- def parse_pr(lines):
- #fields = {"envelope":[]}
- fields = {"envelope":array.array("c")}
- hdrmulti = "envelope"
- for line in lines:
- line = line.rstrip('\n')
- line += '\n'
- result = gnatfieldre.search(line)
- if result is None:
- if hdrmulti != "":
- if fields.has_key(hdrmulti):
- #fields[hdrmulti].append(line)
- fields[hdrmulti].fromstring(line)
- else:
- #fields[hdrmulti] = [line]
- fields[hdrmulti] = array.array("c", line)
- continue
- hdr, arg = result.groups()
- ghdr = "*not valid*"
- result = fieldnamere.search(hdr)
- if result != None:
- ghdr = result.groups()[0]
- if ghdr in fieldnames:
- if multilinefields.has_key(ghdr):
- hdrmulti = ghdr
- #fields[ghdr] = [""]
- fields[ghdr] = array.array("c")
- else:
- hdrmulti = ""
- #fields[ghdr] = [arg]
- fields[ghdr] = array.array("c", arg)
- elif hdrmulti != "":
- #fields[hdrmulti].append(line)
- fields[hdrmulti].fromstring(line)
- if hdrmulti == "envelope" and \
- (hdr == "Reply-To" or hdr == "From" \
- or hdr == "X-GNATS-Notify"):
- arg = fix_email_addrs(arg)
- #fields[hdr] = [arg]
- fields[hdr] = array.array("c", arg)
- if fields.has_key("Reply-To") and len(fields["Reply-To"]) > 0:
- fields["Reply-To"] = fields["Reply-To"]
- else:
- fields["Reply-To"] = fields["From"]
- if fields.has_key("From"):
- del fields["From"]
- if fields.has_key("X-GNATS-Notify") == 0:
- fields["X-GNATS-Notify"] = array.array("c")
- #fields["X-GNATS-Notify"] = ""
- for x in fields.keys():
- fields[x] = fields[x].tostring()
- #fields[x] = "".join(fields[x])
- for x in fields.keys():
- if multilinefields.has_key(x):
- fields[x] = fields[x].rstrip()
-
- return fields
- parse_pr = staticmethod(parse_pr)
-load_index("%s/gnats-adm/index" % gnats_db_dir)
-load_categories("%s/gnats-adm/categories" % gnats_db_dir)
-load_responsible("%s/gnats-adm/responsible" % gnats_db_dir)
-get_userid(gnats_username)
-get_userid(unassigned_username)
-for x in pr_list:
- print "Processing %s..." % x
- a = GNATSbug ("%s/%s" % (gnats_db_dir, x))
- b = Bugzillabug(a)
-write_non_bug_tables()
-outfile.close()
diff --git a/contrib/gnatsparse/magic.py b/contrib/gnatsparse/magic.py
deleted file mode 100755
index 049a7e19b..000000000
--- a/contrib/gnatsparse/magic.py
+++ /dev/null
@@ -1,712 +0,0 @@
-# Found on a russian zope mailing list, and modified to fix bugs in parsing
-# the magic file and string making
-# -- Daniel Berlin <dberlin@dberlin.org>
-import sys, struct, time, re, exceptions, pprint, stat, os, pwd, grp
-
-_mew = 0
-
-# _magic='/tmp/magic'
-# _magic='/usr/share/magic.mime'
-_magic='/usr/share/magic.mime'
-mime = 1
-
-_ldate_adjust = lambda x: time.mktime( time.gmtime(x) )
-
-BUFFER_SIZE = 1024 * 128 # 128K should be enough...
-
-class MagicError(exceptions.Exception): pass
-
-def _handle(fmt='@x',adj=None): return fmt, struct.calcsize(fmt), adj
-
-KnownTypes = {
- # 'byte':_handle('@b'),
- 'byte':_handle('@B'),
- 'ubyte':_handle('@B'),
-
- 'string':('s',0,None),
- 'pstring':_handle('p'),
-
-# 'short':_handle('@h'),
-# 'beshort':_handle('>h'),
-# 'leshort':_handle('<h'),
- 'short':_handle('@H'),
- 'beshort':_handle('>H'),
- 'leshort':_handle('<H'),
- 'ushort':_handle('@H'),
- 'ubeshort':_handle('>H'),
- 'uleshort':_handle('<H'),
-
- 'long':_handle('@l'),
- 'belong':_handle('>l'),
- 'lelong':_handle('<l'),
- 'ulong':_handle('@L'),
- 'ubelong':_handle('>L'),
- 'ulelong':_handle('<L'),
-
- 'date':_handle('=l'),
- 'bedate':_handle('>l'),
- 'ledate':_handle('<l'),
- 'ldate':_handle('=l',_ldate_adjust),
- 'beldate':_handle('>l',_ldate_adjust),
- 'leldate':_handle('<l',_ldate_adjust),
-}
-
-_mew_cnt = 0
-def mew(x):
- global _mew_cnt
- if _mew :
- if x=='.' :
- _mew_cnt += 1
- if _mew_cnt % 64 == 0 : sys.stderr.write( '\n' )
- sys.stderr.write( '.' )
- else:
- sys.stderr.write( '\b'+x )
-
-def has_format(s):
- n = 0
- l = None
- for c in s :
- if c == '%' :
- if l == '%' : n -= 1
- else : n += 1
- l = c
- return n
-
-def read_asciiz(file,size=None,pos=None):
- s = []
- if pos :
- mew('s')
- file.seek( pos, 0 )
- mew('z')
- if size is not None :
- s = [file.read( size ).split('\0')[0]]
- else:
- while 1 :
- c = file.read(1)
- if (not c) or (ord(c)==0) or (c=='\n') : break
- s.append (c)
- mew('Z')
- return ''.join(s)
-
-def a2i(v,base=0):
- if v[-1:] in 'lL' : v = v[:-1]
- return int( v, base )
-
-_cmap = {
- '\\' : '\\',
- '0' : '\0',
-}
-for c in range(ord('a'),ord('z')+1) :
- try : e = eval('"\\%c"' % chr(c))
- except ValueError : pass
- else : _cmap[chr(c)] = e
-else:
- del c
- del e
-
-def make_string(s):
- return eval( '"'+s.replace('"','\\"')+'"')
-
-class MagicTestError(MagicError): pass
-
-class MagicTest:
- def __init__(self,offset,mtype,test,message,line=None,level=None):
- self.line, self.level = line, level
- self.mtype = mtype
- self.mtest = test
- self.subtests = []
- self.mask = None
- self.smod = None
- self.nmod = None
- self.offset, self.type, self.test, self.message = \
- offset,mtype,test,message
- if self.mtype == 'true' : return # XXX hack to enable level skips
- if test[-1:]=='\\' and test[-2:]!='\\\\' :
- self.test += 'n' # looks like someone wanted EOL to match?
- if mtype[:6]=='string' :
- if '/' in mtype : # for strings
- self.type, self.smod = \
- mtype[:mtype.find('/')], mtype[mtype.find('/')+1:]
- else:
- for nm in '&+-' :
- if nm in mtype : # for integer-based
- self.nmod, self.type, self.mask = (
- nm,
- mtype[:mtype.find(nm)],
- # convert mask to int, autodetect base
- int( mtype[mtype.find(nm)+1:], 0 )
- )
- break
- self.struct, self.size, self.cast = KnownTypes[ self.type ]
- def __str__(self):
- return '%s %s %s %s' % (
- self.offset, self.mtype, self.mtest, self.message
- )
- def __repr__(self):
- return 'MagicTest(%s,%s,%s,%s,line=%s,level=%s,subtests=\n%s%s)' % (
- `self.offset`, `self.mtype`, `self.mtest`, `self.message`,
- `self.line`, `self.level`,
- '\t'*self.level, pprint.pformat(self.subtests)
- )
- def run(self,file):
- result = ''
- do_close = 0
- try:
- if type(file) == type('x') :
- file = open( file, 'r', BUFFER_SIZE )
- do_close = 1
-# else:
-# saved_pos = file.tell()
- if self.mtype != 'true' :
- data = self.read(file)
- last = file.tell()
- else:
- data = last = None
- if self.check( data ) :
- result = self.message+' '
- if has_format( result ) : result %= data
- for test in self.subtests :
- m = test.run(file)
- if m is not None : result += m
- return make_string( result )
- finally:
- if do_close :
- file.close()
-# else:
-# file.seek( saved_pos, 0 )
- def get_mod_and_value(self):
- if self.type[-6:] == 'string' :
- # "something like\tthis\n"
- if self.test[0] in '=<>' :
- mod, value = self.test[0], make_string( self.test[1:] )
- else:
- mod, value = '=', make_string( self.test )
- else:
- if self.test[0] in '=<>&^' :
- mod, value = self.test[0], a2i(self.test[1:])
- elif self.test[0] == 'x':
- mod = self.test[0]
- value = 0
- else:
- mod, value = '=', a2i(self.test)
- return mod, value
- def read(self,file):
- mew( 's' )
- file.seek( self.offset(file), 0 ) # SEEK_SET
- mew( 'r' )
- try:
- data = rdata = None
- # XXX self.size might be 0 here...
- if self.size == 0 :
- # this is an ASCIIZ string...
- size = None
- if self.test != '>\\0' : # magic's hack for string read...
- value = self.get_mod_and_value()[1]
- size = (value=='\0') and None or len(value)
- rdata = data = read_asciiz( file, size=size )
- else:
- rdata = file.read( self.size )
- if not rdata or (len(rdata)!=self.size) : return None
- data = struct.unpack( self.struct, rdata )[0] # XXX hack??
- except:
- print >>sys.stderr, self
- print >>sys.stderr, '@%s struct=%s size=%d rdata=%s' % (
- self.offset, `self.struct`, self.size,`rdata`)
- raise
- mew( 'R' )
- if self.cast : data = self.cast( data )
- if self.mask :
- try:
- if self.nmod == '&' : data &= self.mask
- elif self.nmod == '+' : data += self.mask
- elif self.nmod == '-' : data -= self.mask
- else: raise MagicTestError(self.nmod)
- except:
- print >>sys.stderr,'data=%s nmod=%s mask=%s' % (
- `data`, `self.nmod`, `self.mask`
- )
- raise
- return data
- def check(self,data):
- mew('.')
- if self.mtype == 'true' :
- return '' # not None !
- mod, value = self.get_mod_and_value()
- if self.type[-6:] == 'string' :
- # "something like\tthis\n"
- if self.smod :
- xdata = data
- if 'b' in self.smod : # all blanks are optional
- xdata = ''.join( data.split() )
- value = ''.join( value.split() )
- if 'c' in self.smod : # all blanks are optional
- xdata = xdata.upper()
- value = value.upper()
- # if 'B' in self.smod : # compact blanks
- ### XXX sorry, i don't understand this :-(
- # data = ' '.join( data.split() )
- # if ' ' not in data : return None
- else:
- xdata = data
- try:
- if mod == '=' : result = data == value
- elif mod == '<' : result = data < value
- elif mod == '>' : result = data > value
- elif mod == '&' : result = data & value
- elif mod == '^' : result = (data & (~value)) == 0
- elif mod == 'x' : result = 1
- else : raise MagicTestError(self.test)
- if result :
- zdata, zval = `data`, `value`
- if self.mtype[-6:]!='string' :
- try: zdata, zval = hex(data), hex(value)
- except: zdata, zval = `data`, `value`
- if 0 : print >>sys.stderr, '%s @%s %s:%s %s %s => %s (%s)' % (
- '>'*self.level, self.offset,
- zdata, self.mtype, `mod`, zval, `result`,
- self.message
- )
- return result
- except:
- print >>sys.stderr,'mtype=%s data=%s mod=%s value=%s' % (
- `self.mtype`, `data`, `mod`, `value`
- )
- raise
- def add(self,mt):
- if not isinstance(mt,MagicTest) :
- raise MagicTestError((mt,'incorrect subtest type %s'%(type(mt),)))
- if mt.level == self.level+1 :
- self.subtests.append( mt )
- elif self.subtests :
- self.subtests[-1].add( mt )
- elif mt.level > self.level+1 :
- # it's possible to get level 3 just after level 1 !!! :-(
- level = self.level + 1
- while level < mt.level :
- xmt = MagicTest(None,'true','x','',line=self.line,level=level)
- self.add( xmt )
- level += 1
- else:
- self.add( mt ) # retry...
- else:
- raise MagicTestError((mt,'incorrect subtest level %s'%(`mt.level`,)))
- def last_test(self):
- return self.subtests[-1]
-#end class MagicTest
-
-class OffsetError(MagicError): pass
-
-class Offset:
- pos_format = {'b':'<B','B':'>B','s':'<H','S':'>H','l':'<I','L':'>I',}
- pattern0 = re.compile(r''' # mere offset
- ^
- &? # possible ampersand
- ( 0 # just zero
- | [1-9]{1,1}[0-9]* # decimal
- | 0[0-7]+ # octal
- | 0x[0-9a-f]+ # hex
- )
- $
- ''', re.X|re.I
- )
- pattern1 = re.compile(r''' # indirect offset
- ^\(
- (?P<base>&?0 # just zero
- |&?[1-9]{1,1}[0-9]* # decimal
- |&?0[0-7]* # octal
- |&?0x[0-9A-F]+ # hex
- )
- (?P<type>
- \. # this dot might be alone
- [BSL]? # one of this chars in either case
- )?
- (?P<sign>
- [-+]{0,1}
- )?
- (?P<off>0 # just zero
- |[1-9]{1,1}[0-9]* # decimal
- |0[0-7]* # octal
- |0x[0-9a-f]+ # hex
- )?
- \)$''', re.X|re.I
- )
- def __init__(self,s):
- self.source = s
- self.value = None
- self.relative = 0
- self.base = self.type = self.sign = self.offs = None
- m = Offset.pattern0.match( s )
- if m : # just a number
- if s[0] == '&' :
- self.relative, self.value = 1, int( s[1:], 0 )
- else:
- self.value = int( s, 0 )
- return
- m = Offset.pattern1.match( s )
- if m : # real indirect offset
- try:
- self.base = m.group('base')
- if self.base[0] == '&' :
- self.relative, self.base = 1, int( self.base[1:], 0 )
- else:
- self.base = int( self.base, 0 )
- if m.group('type') : self.type = m.group('type')[1:]
- self.sign = m.group('sign')
- if m.group('off') : self.offs = int( m.group('off'), 0 )
- if self.sign == '-' : self.offs = 0 - self.offs
- except:
- print >>sys.stderr, '$$', m.groupdict()
- raise
- return
- raise OffsetError(`s`)
- def __call__(self,file=None):
- if self.value is not None : return self.value
- pos = file.tell()
- try:
- if not self.relative : file.seek( self.offset, 0 )
- frmt = Offset.pos_format.get( self.type, 'I' )
- size = struct.calcsize( frmt )
- data = struct.unpack( frmt, file.read( size ) )
- if self.offs : data += self.offs
- return data
- finally:
- file.seek( pos, 0 )
- def __str__(self): return self.source
- def __repr__(self): return 'Offset(%s)' % `self.source`
-#end class Offset
-
-class MagicFileError(MagicError): pass
-
-class MagicFile:
- def __init__(self,filename=_magic):
- self.file = None
- self.tests = []
- self.total_tests = 0
- self.load( filename )
- self.ack_tests = None
- self.nak_tests = None
- def __del__(self):
- self.close()
- def load(self,filename=None):
- self.open( filename )
- self.parse()
- self.close()
- def open(self,filename=None):
- self.close()
- if filename is not None :
- self.filename = filename
- self.file = open( self.filename, 'r', BUFFER_SIZE )
- def close(self):
- if self.file :
- self.file.close()
- self.file = None
- def parse(self):
- line_no = 0
- for line in self.file.xreadlines() :
- line_no += 1
- if not line or line[0]=='#' : continue
- line = line.lstrip().rstrip('\r\n')
- if not line or line[0]=='#' : continue
- try:
- x = self.parse_line( line )
- if x is None :
- print >>sys.stderr, '#[%04d]#'%line_no, line
- continue
- except:
- print >>sys.stderr, '###[%04d]###'%line_no, line
- raise
- self.total_tests += 1
- level, offset, mtype, test, message = x
- new_test = MagicTest(offset,mtype,test,message,
- line=line_no,level=level)
- try:
- if level == 0 :
- self.tests.append( new_test )
- else:
- self.tests[-1].add( new_test )
- except:
- if 1 :
- print >>sys.stderr, 'total tests=%s' % (
- `self.total_tests`,
- )
- print >>sys.stderr, 'level=%s' % (
- `level`,
- )
- print >>sys.stderr, 'tests=%s' % (
- pprint.pformat(self.tests),
- )
- raise
- else:
- while self.tests[-1].level > 0 :
- self.tests.pop()
- def parse_line(self,line):
- # print >>sys.stderr, 'line=[%s]' % line
- if (not line) or line[0]=='#' : return None
- level = 0
- offset = mtype = test = message = ''
- mask = None
- # get optional level (count leading '>')
- while line and line[0]=='>' :
- line, level = line[1:], level+1
- # get offset
- while line and not line[0].isspace() :
- offset, line = offset+line[0], line[1:]
- try:
- offset = Offset(offset)
- except:
- print >>sys.stderr, 'line=[%s]' % line
- raise
- # skip spaces
- line = line.lstrip()
- # get type
- c = None
- while line :
- last_c, c, line = c, line[0], line[1:]
- if last_c!='\\' and c.isspace() :
- break # unescaped space - end of field
- else:
- mtype += c
- if last_c == '\\' :
- c = None # don't fuck my brain with sequential backslashes
- # skip spaces
- line = line.lstrip()
- # get test
- c = None
- while line :
- last_c, c, line = c, line[0], line[1:]
- if last_c!='\\' and c.isspace() :
- break # unescaped space - end of field
- else:
- test += c
- if last_c == '\\' :
- c = None # don't fuck my brain with sequential backslashes
- # skip spaces
- line = line.lstrip()
- # get message
- message = line
- if mime and line.find("\t") != -1:
- message=line[0:line.find("\t")]
- #
- # print '>>', level, offset, mtype, test, message
- return level, offset, mtype, test, message
- def detect(self,file):
- self.ack_tests = 0
- self.nak_tests = 0
- answers = []
- for test in self.tests :
- message = test.run( file )
- if message :
- self.ack_tests += 1
- answers.append( message )
- else:
- self.nak_tests += 1
- if answers :
- return '; '.join( answers )
-#end class MagicFile
-
-def username(uid):
- try:
- return pwd.getpwuid( uid )[0]
- except:
- return '#%s'%uid
-
-def groupname(gid):
- try:
- return grp.getgrgid( gid )[0]
- except:
- return '#%s'%gid
-
-def get_file_type(fname,follow):
- t = None
- if not follow :
- try:
- st = os.lstat( fname ) # stat that entry, don't follow links!
- except os.error, why :
- pass
- else:
- if stat.S_ISLNK(st[stat.ST_MODE]) :
- t = 'symbolic link'
- try:
- lnk = os.readlink( fname )
- except:
- t += ' (unreadable)'
- else:
- t += ' to '+lnk
- if t is None :
- try:
- st = os.stat( fname )
- except os.error, why :
- return "can't stat `%s' (%s)." % (why.filename,why.strerror)
-
- dmaj, dmin = (st.st_rdev>>8)&0x0FF, st.st_rdev&0x0FF
-
- if 0 : pass
- elif stat.S_ISSOCK(st.st_mode) : t = 'socket'
- elif stat.S_ISLNK (st.st_mode) : t = follow and 'symbolic link' or t
- elif stat.S_ISREG (st.st_mode) : t = 'file'
- elif stat.S_ISBLK (st.st_mode) : t = 'block special (%d/%d)'%(dmaj,dmin)
- elif stat.S_ISDIR (st.st_mode) : t = 'directory'
- elif stat.S_ISCHR (st.st_mode) : t = 'character special (%d/%d)'%(dmaj,dmin)
- elif stat.S_ISFIFO(st.st_mode) : t = 'pipe'
- else: t = '<unknown>'
-
- if st.st_mode & stat.S_ISUID :
- t = 'setuid(%d=%s) %s'%(st.st_uid,username(st.st_uid),t)
- if st.st_mode & stat.S_ISGID :
- t = 'setgid(%d=%s) %s'%(st.st_gid,groupname(st.st_gid),t)
- if st.st_mode & stat.S_ISVTX :
- t = 'sticky '+t
-
- return t
-
-HELP = '''%s [options] [files...]
-
-Options:
-
- -?, --help -- this help
- -m, --magic=<file> -- use this magic <file> instead of %s
- -f, --files=<namefile> -- read filenames for <namefile>
-* -C, --compile -- write "compiled" magic file
- -b, --brief -- don't prepend filenames to output lines
-+ -c, --check -- check the magic file
- -i, --mime -- output MIME types
-* -k, --keep-going -- don't stop st the first match
- -n, --flush -- flush stdout after each line
- -v, --verson -- print version and exit
-* -z, --compressed -- try to look inside compressed files
- -L, --follow -- follow symlinks
- -s, --special -- don't skip special files
-
-* -- not implemented so far ;-)
-+ -- implemented, but in another way...
-'''
-
-def main():
- import getopt
- global _magic
- try:
- brief = 0
- flush = 0
- follow= 0
- mime = 0
- check = 0
- special=0
- try:
- opts, args = getopt.getopt(
- sys.argv[1:],
- '?m:f:CbciknvzLs',
- ( 'help',
- 'magic=',
- 'names=',
- 'compile',
- 'brief',
- 'check',
- 'mime',
- 'keep-going',
- 'flush',
- 'version',
- 'compressed',
- 'follow',
- 'special',
- )
- )
- except getopt.error, why:
- print >>sys.stderr, sys.argv[0], why
- return 1
- else:
- files = None
- for o,v in opts :
- if o in ('-?','--help'):
- print HELP % (
- sys.argv[0],
- _magic,
- )
- return 0
- elif o in ('-f','--files='):
- files = v
- elif o in ('-m','--magic='):
- _magic = v[:]
- elif o in ('-C','--compile'):
- pass
- elif o in ('-b','--brief'):
- brief = 1
- elif o in ('-c','--check'):
- check = 1
- elif o in ('-i','--mime'):
- mime = 1
- if os.path.exists( _magic+'.mime' ) :
- _magic += '.mime'
- print >>sys.stderr,sys.argv[0]+':',\
- "Using regular magic file `%s'" % _magic
- elif o in ('-k','--keep-going'):
- pass
- elif o in ('-n','--flush'):
- flush = 1
- elif o in ('-v','--version'):
- print 'VERSION'
- return 0
- elif o in ('-z','--compressed'):
- pass
- elif o in ('-L','--follow'):
- follow = 1
- elif o in ('-s','--special'):
- special = 1
- else:
- if files :
- files = map(lambda x: x.strip(), v.split(','))
- if '-' in files and '-' in args :
- error( 1, 'cannot use STDIN simultaneously for file list and data' )
- for file in files :
- for name in (
- (file=='-')
- and sys.stdin
- or open(file,'r',BUFFER_SIZE)
- ).xreadlines():
- name = name.strip()
- if name not in args :
- args.append( name )
- try:
- if check : print >>sys.stderr, 'Loading magic database...'
- t0 = time.time()
- m = MagicFile(_magic)
- t1 = time.time()
- if check :
- print >>sys.stderr, \
- m.total_tests, 'tests loaded', \
- 'for', '%.2f' % (t1-t0), 'seconds'
- print >>sys.stderr, len(m.tests), 'tests at top level'
- return 0 # XXX "shortened" form ;-)
-
- mlen = max( map(len, args) )+1
- for arg in args :
- if not brief : print (arg + ':').ljust(mlen),
- ftype = get_file_type( arg, follow )
- if (special and ftype.find('special')>=0) \
- or ftype[-4:] == 'file' :
- t0 = time.time()
- try:
- t = m.detect( arg )
- except (IOError,os.error), why:
- t = "can't read `%s' (%s)" % (why.filename,why.strerror)
- if ftype[-4:] == 'file' : t = ftype[:-4] + t
- t1 = time.time()
- print t and t or 'data'
- if 0 : print \
- '#\t%d tests ok, %d tests failed for %.2f seconds'%\
- (m.ack_tests, m.nak_tests, t1-t0)
- else:
- print mime and 'application/x-not-regular-file' or ftype
- if flush : sys.stdout.flush()
- # print >>sys.stderr, 'DONE'
- except:
- if check : return 1
- raise
- else:
- return 0
- finally:
- pass
-
-if __name__ == '__main__' :
- sys.exit( main() )
-# vim:ai
-# EOF #
diff --git a/contrib/gnatsparse/specialuu.py b/contrib/gnatsparse/specialuu.py
deleted file mode 100755
index b729d9c59..000000000
--- a/contrib/gnatsparse/specialuu.py
+++ /dev/null
@@ -1,104 +0,0 @@
-#! /usr/bin/env python2.2
-
-# Copyright 1994 by Lance Ellinghouse
-# Cathedral City, California Republic, United States of America.
-# All Rights Reserved
-# Permission to use, copy, modify, and distribute this software and its
-# documentation for any purpose and without fee is hereby granted,
-# provided that the above copyright notice appear in all copies and that
-# both that copyright notice and this permission notice appear in
-# supporting documentation, and that the name of Lance Ellinghouse
-# not be used in advertising or publicity pertaining to distribution
-# of the software without specific, written prior permission.
-# LANCE ELLINGHOUSE DISCLAIMS ALL WARRANTIES WITH REGARD TO
-# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
-# FITNESS, IN NO EVENT SHALL LANCE ELLINGHOUSE CENTRUM BE LIABLE
-# FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
-# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
-# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
-# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-#
-# Modified by Jack Jansen, CWI, July 1995:
-# - Use binascii module to do the actual line-by-line conversion
-# between ascii and binary. This results in a 1000-fold speedup. The C
-# version is still 5 times faster, though.
-# - Arguments more compliant with python standard
-
-"""Implementation of the UUencode and UUdecode functions.
-
-encode(in_file, out_file [,name, mode])
-decode(in_file [, out_file, mode])
-"""
-
-import binascii
-import os
-import sys
-from types import StringType
-
-__all__ = ["Error", "decode"]
-
-class Error(Exception):
- pass
-
-def decode(in_file, out_file=None, mode=None, quiet=0):
- """Decode uuencoded file"""
- #
- # Open the input file, if needed.
- #
- if in_file == '-':
- in_file = sys.stdin
- elif isinstance(in_file, StringType):
- in_file = open(in_file)
- #
- # Read until a begin is encountered or we've exhausted the file
- #
- while 1:
- hdr = in_file.readline()
- if not hdr:
- raise Error, 'No valid begin line found in input file'
- if hdr[:5] != 'begin':
- continue
- hdrfields = hdr.split(" ", 2)
- if len(hdrfields) == 3 and hdrfields[0] == 'begin':
- try:
- int(hdrfields[1], 8)
- start_pos = in_file.tell() - len (hdr)
- break
- except ValueError:
- pass
- if out_file is None:
- out_file = hdrfields[2].rstrip()
- if os.path.exists(out_file):
- raise Error, 'Cannot overwrite existing file: %s' % out_file
- if mode is None:
- mode = int(hdrfields[1], 8)
- #
- # Open the output file
- #
- if out_file == '-':
- out_file = sys.stdout
- elif isinstance(out_file, StringType):
- fp = open(out_file, 'wb')
- try:
- os.path.chmod(out_file, mode)
- except AttributeError:
- pass
- out_file = fp
- #
- # Main decoding loop
- #
- s = in_file.readline()
- while s and s.strip() != 'end':
- try:
- data = binascii.a2b_uu(s)
- except binascii.Error, v:
- # Workaround for broken uuencoders by /Fredrik Lundh
- nbytes = (((ord(s[0])-32) & 63) * 4 + 5) / 3
- data = binascii.a2b_uu(s[:nbytes])
- if not quiet:
- sys.stderr.write("Warning: %s\n" % str(v))
- out_file.write(data)
- s = in_file.readline()
-# if not s:
- # raise Error, 'Truncated input file'
- return (hdrfields[2].rstrip(), start_pos, in_file.tell())