Florian Weimer
2005-Sep-12 16:32 UTC
[Secure-testing-commits] r1934 - / bin lib lib/python stamps
Author: fw Date: 2005-09-12 16:32:23 +0000 (Mon, 12 Sep 2005) New Revision: 1934 Added: Makefile bin/check-syntax bin/update-bug-list-db lib/ lib/python/ lib/python/bugs.py lib/python/debian_support.py lib/python/security_db.py stamps/ Log: Add list parser written in Python. "make check" runs a syntax check (no SQLite required). "make all" updates the SQLite database, and performs cross-list consistency checks. There is some support for loading Debian Package/Sources files, but this information is currently not used by the checks. Added: Makefile ==================================================================--- Makefile 2005-09-12 16:27:44 UTC (rev 1933) +++ Makefile 2005-09-12 16:32:23 UTC (rev 1934) @@ -0,0 +1,33 @@ +PYTHON = python +PYTHON_MODULES = $(wildcard lib/python/*.py) +BUG_LISTS = $(wildcard data/*/list) + +all: stamps/bug-lists-imported + +stamps/bug-lists-imported: bin/update-bug-list-db \ + $(BUG_LISTS) $(PYTHON_MODULES) + $(PYTHON) bin/update-bug-list-db + touch $@ + +.PHONY: check check-syntax + +test check: check-syntax + +check-syntax: stamps/CAN-syntax stamps/CVE-syntax \ + stamps/DSA-syntax stamps/DTSA-syntax + +stamps/CAN-syntax: data/CAN/list bin/check-syntax $(PYTHON_MODULES) + $(PYTHON) bin/check-syntax CAN data/CAN/list + touch $@ + +stamps/CVE-syntax: data/CVE/list bin/check-syntax $(PYTHON_MODULES) + $(PYTHON) bin/check-syntax CVE data/CVE/list + touch $@ + +stamps/DSA-syntax: data/DSA/list bin/check-syntax $(PYTHON_MODULES) + $(PYTHON) bin/check-syntax DSA data/DSA/list + touch $@ + +stamps/DTSA-syntax: data/DTSA/list bin/check-syntax $(PYTHON_MODULES) + $(PYTHON) bin/check-syntax DTSA data/DTSA/list + touch $@ Added: bin/check-syntax ==================================================================--- bin/check-syntax 2005-09-12 16:27:44 UTC (rev 1933) +++ bin/check-syntax 2005-09-12 16:32:23 UTC (rev 1934) @@ -0,0 +1,70 @@ +#!/usr/bin/python + +import os +import os.path +import string +import sys + +def setup_paths(): + check_file = ''lib/python/debian_support.py'' + path = os.getcwd() + while 1: + if os.path.exists("%s/%s" % (path, check_file)): + sys.path = [path + ''/lib/python''] + sys.path + return path + idx = string.rfind(path, ''/'') + if idx == -1: + raise ImportError, "could not setup paths" + path = path[0:idx] +root_path = setup_paths() + +import bugs + +def do_parse(f): + names = {} + errors = False + for r in f: + n = r.name + if n[0:4] in (''CAN'', ''CVE''): + n = n[4:] + if names.has_key(n): + if names[n] <> r.name: + sys.stderr.write("error: duplicate CVE entry: %s and %s\n" + % (names[n], r.name)) + else: + sys.stderr.write("error: duplicate CVE entry: %s\n" + % r.name) + errors = True + names[n] = r.name + if errors: + sys.exit(1) + + +def parse_CAN(name): + do_parse(bugs.CVEFile(name)) + +def parse_CVE(name): + f = bugs.CVEFile(name) + # Relax syntax checking a bit. + f.no_version_needs_note = False + do_parse(f) + +def parse_DSA(name): + do_parse(bugs.DSAFile(name)) + +def parse_DTSA(name): + do_parse(bugs.DTSAFile(name)) + +file_types = {''CAN'' : parse_CAN, + ''CVE'' : parse_CVE, + ''DSA'' : parse_DSA, + ''DTSA'' : parse_DTSA} + +if len(sys.argv) <> 3 or not file_types.has_key(sys.argv[1]): + l = file_types.keys() + l.sort() + sys.stderr.write("usage: check-syntax {%s} file-name\n" + % ''|''.join(l)) + sys.exit(1) + +file_types[sys.argv[1]](sys.argv[2]) Property changes on: bin/check-syntax ___________________________________________________________________ Name: svn:executable + * Added: bin/update-bug-list-db ==================================================================--- bin/update-bug-list-db 2005-09-12 16:27:44 UTC (rev 1933) +++ bin/update-bug-list-db 2005-09-12 16:32:23 UTC (rev 1934) @@ -0,0 +1,50 @@ +#!/usr/bin/python + +import os +import os.path +import string +import sys + +def setup_paths(): + check_file = ''lib/python/debian_support.py'' + path = os.getcwd() + while 1: + if os.path.exists("%s/%s" % (path, check_file)): + sys.path = [path + ''/lib/python''] + sys.path + return path + idx = string.rfind(path, ''/'') + if idx == -1: + raise ImportError, "could not setup paths" + path = path[0:idx] +root_path = setup_paths() + +import bugs +import security_db + +db_file = root_path + ''/data/security.db'' +new_file = not os.path.exists(db_file) +db = security_db.DB(db_file) +if new_file: + db.initSchema() +cursor = db.writeTxn() +db.deleteBugs(cursor) +try: + db.insertBugs(cursor, bugs.CVEFile(root_path + ''/data/CAN/list'')) + db.insertBugs(cursor, bugs.CVEFile(root_path + ''/data/CVE/list'', + no_version_needs_note=False)) + db.insertBugs(cursor, bugs.DSAFile(root_path + ''/data/DSA/list'')) + db.insertBugs(cursor, bugs.DTSAFile(root_path + ''/data/DTSA/list'')) +except security_db.InsertError, e: + db.rollback(cursor) + for err in e.errors: + print err + sys.exit(1) + +warnings = db.finishBugs(cursor) +if warnings: + db.rollback(cursor) + for x in warnings: + print "error:", x + sys.exit(1) +else: + db.commit(cursor) Property changes on: bin/update-bug-list-db ___________________________________________________________________ Name: svn:executable + * Property changes on: lib/python ___________________________________________________________________ Name: svn:ignore + *.pyc *.pyo *.db Added: lib/python/bugs.py ==================================================================--- lib/python/bugs.py 2005-09-12 16:27:44 UTC (rev 1933) +++ lib/python/bugs.py 2005-09-12 16:32:23 UTC (rev 1934) @@ -0,0 +1,657 @@ +# bugs.py -- read bug lists used by Debian''s testing security team +# Copyright (C) 2005 Florian Weimer <fw@deneb.enyo.de> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +import debian_support +import re +import types + +class Urgency(debian_support.PseudoEnum): pass + +def listUrgencies(): + urgencies = {} + urgs = ("high", "medium", "low", "unimportant", "unknown") + for u in range(len(urgs)): + urgencies[urgs[u]] = Urgency(urgs[u], -u) + Urgency.urgencies = urgencies + return urgencies +def internUrgency(name, urgencies=listUrgencies()): + if urgencies.has_key(name): + return urgencies[name] + else: + return None +del listUrgencies + +class PackageNote: + """A package note. + + The following member variables are defined: + + release - the release the package note applies to; None means "testing", + notes for other releases never apply to testing + """ + + def __init__(self, package, fixed_version, release, urgency): + self.id = None + self.package = package + if (fixed_version is not None + and type(fixed_version) == types.StringType): + self.fixed_version = debian_support.Version(fixed_version) + else: + self.fixed_version = fixed_version + if release == '''': + self.release = None + else: + if type(release) == types.StringType: + release = debian_support.internRelease(release) + if release is None: + raise ValueError, "invalid release" + self.release = release + if type(urgency) == types.StringType: + urgency = internUrgency(urgency) + if urgency is None: + raise ValueError, "invalid urgency" + self.urgency = urgency + self.bugs = [] + + def affects(self, version, release=None): + """Returns true if this package note affects the given version. + + Both version and release can be strings. In this case, they + are automatically promoted to the correct Python objects. + """ + + if type(version) == types.StringType: + version = debian_support.Version(version) + if type(release) == types.ReleaseType: + release = Release(release) + + if release is None: + if self.release is not None: + # If there''s a release spec, and we are running for + # testing, this note does apply. + return False + else: + if self.release is not None and self.release <> release: + # If there''s a release spec, it must match ours. + return False + # Standard version comparison if the releases match. + return self.version is None or version < self.version + + def writeDB(self, cursor, bug_name): + """Writes the object to an SQLite database. + + If the id attibute is already set, it is assumed that the + object has already been written. + """ + + if self.id is not None: + return + + if self.fixed_version: + v = str(self.fixed_version) + else: + v = None + cursor.execute("""INSERT INTO package_notes + (bug_name, package, fixed_version, release, urgency) + VALUES (?, ?, ?, ?, ?)""", + (bug_name, self.package, v, self.release or '''', + str(self.urgency))) + for (rowid,) in cursor.execute(''SELECT last_insert_rowid()''): + self.id = rowid + for b in self.bugs: + cursor.execute("""INSERT INTO debian_bugs (bug, note) + VALUES (?, ?)""", (b, rowid)) + return + assert False + + def loadBugs(self, cursor): + assert type(self.id) == types.IntType, self.id + assert len(self.bugs) == 0 + for (b,) in cursor.execute\ + ("SELECT bug FROM debian_bugs WHERE note = ?", (self.id,)): + self.bugs.append(int(b)) + +class PackageNoteFromDB(PackageNote): + def __init__(self, cursor, nid): + for bug_name, package, fixed_version, release, urgency \ + in cursor.execute\ + ("""SELECT bug_name, package, fixed_version, release, urgency + FROM package_notes WHERE id = ?""", (nid,)): + PackageNote.__init__(package, fixed_version, release, urgency) + self.id = nid + self.bug_name = bug_name + self.loadBugs(cursor) + return + raise ValueError, "invalid package note ID %d" % id + +class PackageNoteParsed(PackageNote): + """Subclass with a constructor that parses package notes.""" + + re_bug = re.compile(r''^bug #(\d+)$'') + re_notes_split = re.compile(r''\s*;\s+'') + + def __init__(self, package, version, notes): + rel = None + bugs = [] + urgency = "unknown" + if notes is not None: + for n in self.re_notes_split.split(notes): + u = internUrgency(n) + if u: + urgency = u + continue + + r = debian_support.internRelease(n) + if r: + rel = r + continue + + match = self.re_bug.match(n) + if match: + (bug,) = match.groups() + bugs.append(int(bug)) + continue + + if n == ''unfixed'': + self.unfixed = True + continue + + raise SyntaxError , ''unknown package note %s\n'' % `n` + PackageNote.__init__(self, package, version, rel, urgency) + self.bugs = bugs + +class BugBase: + "Base class for entries in the bug list.""" + + re_cve_name = re.compile(r''^(?:CAN|CVE)-\d{4}-\d{4}$'') + + def __init__(self, fname, lineno, date, name, description, comments): + assert type(fname) == types.StringType + assert type(lineno) == types.IntType + self.source_file = fname + self.source_line = lineno + self.date = date + self.name = name + self.description = description + self.comments = comments + self.notes = [] + self.xref = [] + self.not_for_us = False + + def isFromCVE(self): + """Returns True if the name has been officially assigned. + + Our database is mostly CVE-driven, but sometimes we need names + which have not been assigned yet. Therefore, we generate + identifiers on the fly. + """ + return self.re_cve_name.match(self.name) is not None + + def cveStatus(self): + if self.isFromCVE(): + if self.name[0:4] == ''CVE'': + return ''ASSIGNED'' + return ''CANDIDATE'' + else: + return '''' + + def writeDB(self, cursor): + """Writes the record to an SQLite3 database.""" + + if self.not_for_us: + not_for_us = 1 + else: + not_for_us = 0 + + import apsw + try: + cursor.execute("""INSERT INTO bugs + (name, cve_status, not_for_us, description, + source_file, source_line) + VALUES (?, ?, ?, ?, ?, ?)""", + (self.name, self.cveStatus(), not_for_us, + self.description, + self.source_file, self.source_line)) + except apsw.ConstraintError: + raise ValueError, "bug name %s is not unique" % self.name + + for (typ, c) in self.comments: + cursor.execute("""INSERT INTO bugs_notes + (bug_name, typ, comment) VALUES (?, ?, ?)""", + (self.name, typ, c)) + + for n in self.notes: + n.writeDB(cursor, self.name) + + for x in self.xref: + try: + cursor.execute("""INSERT INTO bugs_xref + (source, target) VALUES (?, ?)""", + (self.name, x)) + except apsw.ConstraintError: + raise ValueError, \ + "cross reference to %s appears multiple times" % x + +class Bug(BugBase): + """Class for bugs for which we have some data.""" + + def __init__(self, fname, lineno, date, name, description, comments, notes, + xref, not_for_us=False): + assert len(notes) == 0 or isinstance(notes[0], PackageNote) + assert len(xref) == 0 or type(xref[0]) == types.StringType + assert type(not_for_us) == types.BooleanType + BugBase.__init__(self, fname, lineno, date, name, + description, comments) + self.notes = notes + self.xref = xref + self.not_for_us = not_for_us + +class BugFromDB(Bug): + def __init__(self, cursor, name): + assert type(name) == types.StringType + for r in cursor.execute(''SELECT * FROM bugs WHERE name = ?'', (name,)): + rdesc = cursor.getdescription() + data = {} + for j in range(len(rdesc)): + data[rdesc[j][0]] = r[j] + # FIXME: load date + Bug.__init__(self, data[''source_file''], data[''source_line''], + None, name, data[''description''], comments=[], + notes=[], xref=[], + not_for_us=not not data[''not_for_us'']) + for (x,) in cursor.execute\ + (''SELECT target FROM bugs_xref WHERE source = ?'', (name,)): + self.xref.append(x) + for (t, c) in cursor.execute\ + ("""SELECT typ, comment FROM bugs_notes + WHERE bug_name = ? + ORDER BY rowid""", + (name,)): + self.comments.append((t, c)) + + # temporary list required because loadBugs needs the cursor + for nid, package, fixed_version, release, urgency \ + in list(cursor.execute + ("""SELECT id, package, fixed_version, release, urgency + FROM package_notes WHERE bug_name = ?""", (name,))): + n = PackageNote(package, fixed_version, release, urgency) + n.id = nid + n.bug_name = name + n.loadBugs(cursor) + self.notes.append(n) + return + raise ValueError, "unknown bug " + `name` + +class BugReservedCVE(BugBase): + """Class for reserved CVE entries.""" + def __init__(self, fname, lineno, name, comments=None): + if comments is None: + comments = [] + BugBase.__init__(self, fname, lineno, None, name, "RESERVED", comments) + def cveStatus(self): + return ''RESERVED'' + +class BugRejectedCVE(BugBase): + """Class for rejected CVE entries.""" + def __init__(self, fname, lineno, name): + BugBase.__init__(self, fname, lineno, None, name, "REJECTED", []) + def cveStatus(self): + return ''REJECTED'' + +class FileBase(debian_support.PackageFile): + re_non_ascii = re.compile(r''.*([^\n\t -~]).*'') + re_empty = re.compile(r''^(?:\s*$|--)'') + re_indent = re.compile(r''^\s+(.*?)\s*$'') + re_begin_claim = re.compile(r''^begin claimed by (\S+)\s*$'') + re_end_claim = re.compile(r''^end claimed by (\S+)\s*$'') + re_stop = re.compile(r''^STOP:'') + + re_xref_required = re.compile(r''^\{'') + re_xref = re.compile(r''^\{\s*([^\}]+?)\s*\}$'') + re_whitespace = re.compile(r''\s+'') + re_xref_entry = re.compile(''^(?:(?:CAN|CVE)-\d{4}-\d{4}'' + + r''|VU#\d{6}'' + + r''|DSA-\d+(?:-\d+)?|DTSA-\d+-\d+)$'') + + re_package_required = re.compile(r''^-'') + re_package = re.compile(r''^- ([A-Za-z0-9:.+-]+)'' + + r''(?:\s+([A-Za-z0-9:.+-]+))?\s*(?:\((.*)\))?$'') + re_not_for_us_required = re.compile(r''^NOTE:\s+not?e?-fo?r-u'') + re_not_for_us = re.compile(r''^NOTE:\s+not-for-us(?:\s+\((.*)\))?\s*$'') + re_reserved = re.compile(r''^NOTE:\s+reserved\s*$'') + re_rejected = re.compile(r''^NOTE:\s+rejected\s*$'') + re_note = re.compile(r''^NOTE:\s+(.*)$'') + re_todo = re.compile(r''^TODO:\s+(.*)$'') + + def isUniqueName(self, name): + """Returns True if the name is a real, unique name.""" + return True + + def matchHeader(self, line): + """Parses the header of a record. + + Must be overriden by child classes.""" + assert False + + def getLine(self): + while 1: + self.line = self.file.readline() + self.lineno += 1 + + if self.line == '''' or not self.re_empty.match(self.line): + break + + match = self.re_non_ascii.match(self.line) + if match is not None: + self.raiseSyntaxError(''invalid non-printable character %s'' + % `match.groups()[0]`) + + def rawRecords(self): + """Generator which returns raw records. + + These records are 4-tuples with the following contents: + + - line number of the start of the record + - release data; can be None + - something which resembles a CVE name; is not necessarily unique + if it does not match the CVE syntax + - part of the CVE description + - subrecords, a list of pairs line number/string + """ + + self.getLine() + record = [] + while self.line: + first_line = self.lineno + + if self.re_stop.match(self.line): + # Theoretically, we could stop here, but we want + # syntax checks for the remaining records, too. + self.getLine() + continue + + # We ignore claims, but check their syntax nevertheless. + match = self.re_begin_claim.match(self.line) + if match: + self.getLine() + continue + match = self.re_end_claim.match(self.line) + if match: + self.getLine() + continue + + (date, record_name, description) = self.matchHeader(self.line) + + record = [] + while self.line: + self.getLine() + + match = self.re_indent.match(self.line) + if match: + (r,) = match.groups() + record.append((self.lineno, r)) + else: + break + # line contains the next line at this point. + + yield (first_line, date, record_name, description, record) + + def __iter__(self): + """Generator for Bug objects.""" + for (first_lineno, date, record_name, description, record)\ + in self.rawRecords(): + + not_for_us = None + xref = [] + pkg_notes = [] + comments = [] + cve_reserved = False + cve_rejected = False + first_bug = 0 + + for (lineno, r) in record: + if self.re_xref_required.match(r): + match = self.re_xref.match(r) + if match: + (xref_string,) = match.groups() + for x in self.re_whitespace.split(xref_string): + if self.re_xref_entry.match(x): + xref.append(x) + else: + self.raiseSyntaxError\ + ("invalid cross reference " + `x`, lineno) + continue + else: + self.raiseSyntaxError("expected cross reference, got: " + + `r`, lineno) + + if self.re_package_required.match(r): + match = self.re_package.match(r) + if match: + (p, v, d) = match.groups() + if v is None and d is None and \ + self.no_version_needs_note: + raise SyntaxError, \ + ''version-less package entry requires note'' + + if v == ''not-affected'': + # ''0'' is the minimum version number possible. + pkg_notes.append(PackageNoteParsed(p, ''0'', None)) + # ''d'' is a free-form field in this case. + comments.append((''NOTE'', d)) + else: + x = PackageNoteParsed(p, v, d) + pkg_notes.append(x) + if first_bug == 0 and len(x.bugs) > 0: + first_bug = x.bugs[0] + else: + self.raiseSyntaxError("expected package entry, got: " + + `r`, lineno) + continue + + if self.re_not_for_us_required.match(r): + match = self.re_not_for_us.match(r) + if match: + (not_for_us,) = match.groups() + if not_for_us is None: + not_for_us = '''' + continue + else: + self.raiseSyntaxError("expected not-for-us entry, " + + "got: " + `r`, lineno) + + match = self.re_reserved.match(r) + if match: + cve_reserved = True + continue + + match = self.re_rejected.match(r) + if match: + cve_rejected = True + continue + + match = self.re_note.match(r) + if match: + (note,) = match.groups() + comments.append((''NOTE'', note)) + continue + + match = self.re_todo.match(r) + if match: + (todo,) = match.groups() + comments.append((''TODO'', todo)) + continue + + self.raiseSyntaxError(''expected CAN/CVE annotation, got: %s'' + % `r`, lineno) + break + + if cve_reserved: + if not self.isUniqueName(record_name): + self.raiseSyntaxError\ + (''reserved CVE entries must have CAN/CVE names'', + first_lineno) + if len(pkg_notes) > 0: + # The bug has extra data even though it is marked + # reserved by CVE, we have to issue the full + # version because the official CVE lags a bit. + yield Bug(self.file.name, first_lineno, date, + record_name, description, comments, + notes=pkg_notes, xref=xref) + else: + yield BugReservedCVE(self.file.name, first_lineno, + record_name, comments) + + elif cve_rejected: + if not self.isUniqueName(record_name): + self.raiseSyntaxError\ + (''rjeected CVE entries must have CAN/CVE names'', + first_lineno) + if len(pkg_notes) > 0: + self.raiseSyntaxError\ + (''rejected CVE entries must not have notes'', + first_lineno) + yield BugRejectedCVE(self.file.name, first_lineno, record_name) + + elif not_for_us is not None: + if not self.isUniqueName(record_name): + self.raiseSyntaxError\ + (''not-for-us bug must have CAN/CVE name'', first_lineno) + if len(pkg_notes) > 0: + self.raiseSyntaxError\ + (''package information not allowed in not-for-us bugs'', + first_lineno) + yield Bug(self.file.name, first_lineno, date, + record_name, description, comments, notes=[], + xref=xref, not_for_us=True) + else: + if not self.isUniqueName(record_name): + record_name = ''FAKE-%07d-%06d'' % (first_bug, first_lineno) + yield Bug(self.file.name, first_lineno, date, + record_name, description, + comments, notes=pkg_notes, xref=xref) + +class CVEFile(FileBase): + """A CVE file, as used by the Debian testing security team.""" + + re_cve = re.compile(r''^((?:CAN|CVE)-\d{4}-(?:\d{4}|XXXX))\s+(.*?)\s*$'') + + def __init__(self, name, fileObj=None, no_version_needs_note=True): + FileBase.__init__(self, name, fileObj) + self.no_version_needs_note = no_version_needs_note + + def isUniqueName(self, name): + return BugBase.re_cve_name.match(name) is not None + + def matchHeader(self, line): + match = self.re_cve.match(line) + if not match: + self.raiseSyntaxError("expected CVE record, got: %s" % `line`) + (record_name, description) = match.groups() + return (None,) + match.groups() + +class DSAFile(FileBase): + """A DSA file. + + Similar to a CVE file, only that it contains DSAs as its main + reference point, and release dates. + """ + + re_dsa = re.compile(r''^\[(\d\d) ([A-Z][a-z][a-z]) (\d{4})\] '' + + r''(DSA-\d+(?:-\d+)?)\s+'' + + r''(.*?)\s*$'') + + month_names = {''Jan'': 1, + ''Feb'': 2, + ''Mar'': 3, + ''Apr'': 4, + ''May'': 5, + ''Jun'': 6, + ''Jul'': 7, + ''Aug'': 8, + ''Sep'': 9, + ''Oct'': 10, + ''Nov'': 11, + ''Dec'': 12} + + # temporary hack, until we know what "!" actually means. + re_package_required = re.compile(r''^[-!]'') + re_package = re.compile(r''^[-!] ([A-Za-z0-9:.+-]+)'' + + r''(?:\s+([A-Za-z0-9:.+-]+))?\s*(?:\((.*)\))?$'') + def matchHeader(self, line): + match = self.re_dsa.match(line) + if not match: + self.raiseSyntaxError("expected DSA record, got: %s" % `line`) + (record_name, description) = match.groups() + (day, month, year, name, desc) = match.groups() + try: + month = self.month_names[month] + except KeyError: + self.raiseSyntaxError("invalid month name %s" % `month`) + return ("%s-%02d-%s" % (year, month, day), name, desc) + +class DTSAFile(FileBase): + """A DTSA file. + + Like a DSA file, but the date format is different. + """ + + re_dsa = re.compile\ + (r''^\[([A-Z][a-z]{3,}) (\d\d?)(?:st|nd|rd|th), (\d{4})\] '' + + r''(DTSA-\d+-\d+)\s+'' + + r''(.*?)\s*$'') + month_names = {''January'': 1, + ''February'': 2, + ''March'': 3, + ''April'': 4, + ''May'': 5, + ''June'': 6, + ''July'': 7, + ''August'': 8, + ''September'': 9, + ''October'': 10, + ''November'': 11, + ''December'': 12} + + def matchHeader(self, line): + match = self.re_dsa.match(line) + if not match: + self.raiseSyntaxError("expected DTSA record, got: %s" % `line`) + (record_name, description) = match.groups() + (month, day, year, name, desc) = match.groups() + try: + month = self.month_names[month] + except KeyError: + self.raiseSyntaxError("invalid month name %s" % `month`) + return ("%s-%02d-%02d" % (year, month, int(day)), name, desc) + +def test(): + assert internUrgency("high") > internUrgency("medium") + + assert FileBase.re_non_ascii.match(''illegal \xf6 character\n'') + + note = PackageNoteParsed(''chmlib'', ''0.36-1'', ''bug #327431; medium'') + assert note.bugs == [327431] + assert note.package == ''chmlib'' + assert note.fixed_version == debian_support.Version(''0.36-1'') + assert note.urgency == internUrgency(''medium'') + + for p in CVEFile(''../../data/CAN/list''): + pass + +if __name__ == "__main__": + test() Added: lib/python/debian_support.py ==================================================================--- lib/python/debian_support.py 2005-09-12 16:27:44 UTC (rev 1933) +++ lib/python/debian_support.py 2005-09-12 16:32:23 UTC (rev 1934) @@ -0,0 +1,187 @@ +# debian_support.py -- Python module for Debian metadata +# Copyright (C) 2005 Florian Weimer <fw@deneb.enyo.de> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +"""This module implements facilities to deal with Debian-specific metadata.""" + +import re + +class Version: + """This class implements Debian version numbers.""" + + def __init__(self, version): + """Creates a new Version object.""" + self.__asString = version + self.__parsed = self.__parse(version) + + def __str__(self): + return self.__asString + + def __repr__(self): + return ''Version(%s)'' % `self.__asString` + + def __cmp__(self, other): + """Compares two versions. + + This method implements the algorithm in the Debian Policy.""" + return cmp(self.__parsed, other.__parsed) + + def __parse(self, v, regexp=\ + re.compile(r''^(?:(\d+):)?([A-Za-z0-9.+:-]+?)'' + + r''(?:-([A-Za-z0-9.+]+))?$'')): + match = regexp.match(v) + if match is None: + raise ValueError, "invalid Debian version string" + (epoch, upstream, debian) = match.groups() + if epoch is None: + epoch = 0 + else: + epoch = int(epoch) + return (epoch, self.__parse_1(upstream), self.__parse_1(debian)) + + def __parse_1(self, x, non_digits=re.compile(r''^([^0-9]*)(.*)$''), + digits=re.compile(r''^([0-9]*)(.*)$'')): + l = [] + while x is not None and x <> '''': + (nd, x) = non_digits.match(x).groups() + (d, x) = digits.match(x).groups() + if d == '''': + d = 0 + else: + d = int(d) + l.append(nd) + l.append(d) + return l + +class PackageFile: + """A Debian package file. + + Objects of this class can be used to read Debian''s Source and + Packages files.""" + + re_field = re.compile(r''^([A-Za-z][A-Za-z0-9-]+):\s+(.*?)\s*$'') + re_continuation = re.compile(r''^\s+(?:\.|(\S.*?)\s*)$'') + + def __init__(self, name, fileObj=None): + """Creates a new package file object. + + name - the name of the file the data comes from + fileObj - an alternate data source; the default is to open the + file with the indicated name. + """ + if fileObj is None: + fileObj = file(name) + self.name = name + self.file = fileObj + self.lineno = 0 + + def __iter__(self): + line = self.file.readline() + self.lineno += 1 + pkg = [] + while line: + if line == ''\n'': + if len(pkg) == 0: + self.raiseSyntaxError(''expected package record'') + yield pkg + pkg = [] + line = self.file.readline() + self.lineno += 1 + continue + + match = self.re_field.match(line) + if not match: + self.raiseSyntaxError("expected package field") + (name, contents) = match.groups() + + while True: + line = self.file.readline() + self.lineno += 1 + match = self.re_continuation.match(line) + if match: + (ncontents,) = match.groups() + if ncontents is None: + ncontents = "" + contents = "%s\n%s" % (contents, ncontents) + else: + break + pkg.append((name, contents)) + + def raiseSyntaxError(self, msg, lineno=None): + e = SyntaxError(msg) + e.filename = self.name + if lineno is None: + e.lineno = self.lineno + else: + e.lineno = lineno + raise e + +class PseudoEnum: + """A base class for types which resemble enumeration types.""" + def __init__(self, name, order): + self._name = name + self._order = order + def __repr__(self): + return ''%s(%s)''% (self.__class__._name__, `name`) + def __str__(self): + return self._name + def __cmp__(self, other): + return cmp(self._order, other._order) + def __hash__(self): + return hash(self._order) + +class Release(PseudoEnum): pass + +def listReleases(): + releases = {} + rels = ("woody", "sarge", "etch") + for r in range(len(rels)): + releases[rels[r]] = Release(rels[r], r) + Release.releases = releases + return releases +def internRelease(name, releases=listReleases()): + if releases.has_key(name): + return releases[name] + else: + return None +del listReleases + +def test(): + # Version + assert Version(''0'') < Version(''a'') + assert Version(''1.0'') < Version(''1.1'') + assert Version(''1.2'') < Version(''1.11'') + assert Version(''1.0-0.1'') < Version(''1.1'') + assert Version(''1.0-0.1'') < Version(''1.0-1'') + assert Version(''1.0-0.1'') == Version(''1.0-0.1'') + assert Version(''1.0-0.1'') < Version(''1.0-1'') + assert Version(''1.0final-5sarge1'') > Version(''1.0final-5'') \ + > Version(''1.0a7-2'') + assert Version(''0.9.2-5'') < Version(''0.9.2+cvs.1.0.dev.2004.07.28-1.5'') + assert Version(''1:500'') < Version(''1:5000'') + assert Version(''100:500'') > Version(''11:5000'') + + # Release + assert internRelease(''sarge'') < internRelease(''etch'') + + # PackageFile + # for p in PackageFile(''../../data/packages/sarge/Sources''): + # assert p[0][0] == ''Package'' + # for p in PackageFile(''../../data/packages/sarge/Packages.i386''): + # assert p[0][0] == ''Package'' + +if __name__ == "__main__": + test() Added: lib/python/security_db.py ==================================================================--- lib/python/security_db.py 2005-09-12 16:27:44 UTC (rev 1933) +++ lib/python/security_db.py 2005-09-12 16:32:23 UTC (rev 1934) @@ -0,0 +1,430 @@ +# security_db.py -- simple, CVE-driven Debian security bugs database +# Copyright (C) 2005 Florian Weimer <fw@deneb.enyo.de> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +"""This module implements a small database for tracking security bugs. + +Note that the database is always secondary to the text files. The +database is only an implementation tool, and not used for maintaining +the data. + +The data is kept in a SQLite 3 database. + +FIXME: Document the database schema once it is finished. +""" + +import apsw +import bugs +import debian_support +import re +import sys +import types + +class InsertError(Exception): + """Class for capturing insert errors. + + The ''errors'' member collects all error messages. + """ + + def __init__(self, errors): + assert len(errors) > 0, errors + assert type(errors) == types.ListType, errors + assert type(errors[0])== types.StringType, errors + self.errors = errors + + def __str__(self): + return self.errors[0] + '' [more...]'' + +class DB: + """Access to the security database. + + This is a wrapper around an SQLite database object (which is + accessible as the "db" member. + + Most operations need a special cursor object, which can be created + with a cursor object. The name "cursor" is somewhat of a + misnomer because these objects are quite versatile. + """ + + def __init__(self, name): + self.db = apsw.Connection(name) + + def cursor(self): + """Creates a new database cursor. + + Also see the writeTxn method.""" + return self.db.cursor() + + def writeTxn(self): + """Creates a cursor for an exclusive transaction. + + No other process may modify the database at the same time. + After finishing the work, you should invoke the commit or + rollback methods below. + """ + c = self.cursor() + c.execute("BEGIN TRANSACTION EXCLUSIVE") + return c + + def commit(self, cursor): + """Makes the changes in the transaction permanent.""" + cursor.execute("COMMIT") + + def rollback(self, cursor): + """Undos the changes in the transaction.""" + cursor.execute("ROLLBACK") + + def initSchema(self): + """Creates the database schema.""" + cursor = self.cursor() + + cursor.execute("""CREATE TABLE source_packages + (package TEXT NOT NULL, + release TEXT NOT NULL, subrelease TEXT NOT NULL, + version TEXT NOT NULL, + PRIMARY KEY (package, release, subrelease));""") + + cursor.execute("""CREATE TABLE binary_packages + (package TEXT NOT NULL, + release TEXT NOT NULL, subrelease TEXT NOT NULL, + architecture TEXT NOT NULL, + version TEXT NOT NULL, + source TEXT NOT NULL, source_version TEXT NOT NULL, + PRIMARY KEY (package, release, subrelease, architecture));""") + cursor.execute("""CREATE INDEX binary_packages_source + ON binary_packages(source)""") + + cursor.execute("""CREATE TABLE package_notes + (id INTEGER NOT NULL PRIMARY KEY, + bug_name TEXT NOT NULL, + package TEXT NOT NULL, + fixed_version TEXT + CHECK (fixed_version IS NULL OR fixed_version <> ''''), + release TEXT NOT NULL, + urgency TEXT NOT NULL)""") + + cursor.execute("""CREATE TABLE debian_bugs + (bug INTEGER NOT NULL, + note INTEGER NOT NULL, + PRIMARY KEY (bug, note))""") + + cursor.execute("""CREATE TABLE bugs + (name TEXT NOT NULL PRIMARY KEY, + cve_status TEXT NOT NULL + CHECK (cve_status IN + ('''', ''CANDIDATE'', ''ASSIGNED'', ''RESERVED'', ''REJECTED'')), + not_for_us INTEGER NOT NULL CHECK (not_for_us IN (0, 1)), + description TEXT NOT NULL, + source_file TEXT NOT NULL, + source_line INTEGER NOT NULL)""") + + cursor.execute("""CREATE TABLE bugs_notes + (bug_name TEXT NOT NULL CHECK (typ <> ''''), + typ TEXT NOT NULL CHECK (typ IN (''TODO'', ''NOTE'')), + release TEXT NOT NULL DEFAULT '''', + comment TEXT NOT NULL CHECK (comment <> ''''))""") + + cursor.execute("""CREATE TABLE bugs_xref + (source TEXT NOT NULL, + target TEXT NOT NULL, + normalized_target TEXT NOT NULL DEFAULT '''', + PRIMARY KEY (source, target))""") + + def updateSources(self, cursor, release, subrelease, packages): + """Reads a Sources file and adds it to the database. + + Old records for the same release/subrelease pair are removed. + + cursor - cursor used to update the database + release - Debian release (e.g. sarge) + subrelease - fork of a release (e.g. security) + packages - debian_support.PackageFile object with source packages + """ + + cursor.execute(''DELETE FROM source_packages '' + + ''WHERE release = ? AND subrelease = ?'', + (release, subrelease)) + + for pkg in packages: + pkg_name = None + pkg_version = None + for (name, contents) in pkg: + if name == "Package": + pkg_name = contents + elif name == "Version": + pkg_version = debian_support.Version(contents) + if pkg_name is None: + raise SyntaxError\ + ("package record does not contain package name") + if pkg_version is None: + raise SyntaxError\ + ("package record for %s does not contain version" + % pkg_name) + cursor.execute(''INSERT INTO source_packages '' + + ''(package, release, subrelease, version) '' + + ''VALUES (?, ?, ?, ?)'', + (pkg_name, release, subrelease, str(pkg_version))) + + + def updatePackages(self, cursor, + release, subrelease, architecture, + packages): + """Reads a Packages file and adds it to the database. + + Old records for the same release/subrelease/architecture + triple are removed. + + cursor - cursor used to update the database + release - Debian release (e.g. sarge) + subrelease - fork of a release (e.g. security) + architecture - architecture of binary packages (e.g. i386) + packages - debian_support.PackageFile object with binary packages + """ + + re_source = re.compile\ + (r''^([a-zA-Z0-9.+-]+)(?:\s+\(([a-zA-Z0-9.+:-]+)\))?$'') + + cursor.execute(''DELETE FROM binary_packages '' + + ''WHERE release = ? AND subrelease = ? AND architecture = ?'', + (release, subrelease, architecture)) + + + for pkg in packages: + pkg_name = None + pkg_version = None + pkg_source = None + pkg_source_version = None + for (name, contents) in pkg: + if name == "Package": + pkg_name = contents + elif name == "Version": + pkg_version = debian_support.Version(contents) + elif name == "Source": + match = re_source.match(contents) + if match is None: + raise SyntaxError((''binary package %s references '' + + ''invalid source package %s'') % + (pkg_name, `contents`)) + (pkg_source, pkg_source_version) = match.groups() + + if pkg_name is None: + raise SyntaxError\ + ("binary package record does not contain package name") + if pkg_version is None: + raise SyntaxError\ + ("binary record for %s does not contain version" + % pkg_name) + if pkg_source is None: + pkg_source = pkg_name + if pkg_source_version is None: + pkg_source_version = pkg_version + + cursor.execute(''INSERT INTO binary_packages '' + + ''(package, release, subrelease, architecture,'' + + ''version, source, source_version) '' + + ''VALUES (?, ?, ?, ?, ?, ?, ?)'', + (pkg_name, release, subrelease, architecture, + str(pkg_version), + pkg_source, str(pkg_source_version))) + + def deleteBugs(self, cursor): + """Deletes all record bug reports from the database.""" + cursor.execute("DELETE FROM package_notes") + cursor.execute("DELETE FROM debian_bugs") + cursor.execute("DELETE FROM bugs") + cursor.execute("DELETE FROM bugs_notes") + cursor.execute("DELETE FROM bugs_xref") + + def insertBugs(self, cursor, source): + """Reads the CAN/CVE/DSA/DTSA file and writes them to the database.""" + + errors = [] + for bug in source: + try: + bug.writeDB(cursor) + except ValueError, e: + errors.append("%s: %d: error: %s" + % (bug.source_file, bug.source_line, e)) + if errors: + raise InsertError(errors) + + def finishBugs(self, cursor): + """After inserting new bugs, update cross-references. + + Returns a list of warning messages.""" + + warnings = [] + + for b1, b2 in list(cursor.execute\ + ("""SELECT b1.name, b2.name FROM bugs AS b1, bugs AS b2 + WHERE b1.name LIKE ''CVE-%'' + AND b2.name = ''CAN-'' || substr(b1.name, 5, 9)""")): + b1 = bugs.BugFromDB(cursor, b1) + b2 = bugs.BugFromDB(cursor, b2) + + warnings.append("%s:%d: duplicate CVE entries %s and %s" + % (b1.source_file, b1.source_line, + b1.name, b2.name)) + warnings.append("%s:%d: location of %s" + % (b1.source_file, b1.source_line, b1.name)) + warnings.append("%s:%d: location of %s" + % (b2.source_file, b2.source_line, b2.name)) + + + for source, target in list(cursor.execute\ + ("""SELECT source, target FROM bugs_xref + WHERE normalized_target = ''''""")): + if bugs.BugBase.re_cve_name.match(target): + can_target = ''CAN-'' + target[4:] + cve_target = ''CVE-'' + target[4:] + + found = False + for (t,) in list(cursor.execute("""SELECT name FROM bugs + WHERE name IN (?, ?)""", (can_target, cve_target))): + assert not found, t + cursor.execute("""UPDATE bugs_xref + SET normalized_target = ? + WHERE source = ? AND target = ?""", + (t, source, target)) + found = True + if not found: + b = bugsFromDB(c, source) + warnings.append\ + ("%s: %d: reference to unknwown CVE entry %s" + % (b.source_file, b.source_line, target)) + + return warnings + + + def check(self, cursor=None): + """Runs a simple consistency check and prints the results.""" + + if cursor is None: + cursor = self.cursor() + + for (package, release, subrelease, architecture, source) in\ + cursor.execute( + """SELECT package, release, subrelease, architecture, source + FROM binary_packages + WHERE NOT EXISTS + (SELECT * + FROM source_packages AS sp + WHERE sp.package = binary_packages.source + AND sp.release = binary_packages.release + AND sp.subrelease = binary_packages.subrelease) + """): + print "error: binary package without source package" + print " binary package:", package + print " release:", release + if subrelease: + print " subrelease:", subrelease + print " architecture:", architecture + print " missing source package:", source + + for (package, release, subrelease, architecture, version, + source, source_version) \ + in cursor.execute("""SELECT binary_packages.package, + binary_packages.release, binary_packages.subrelease, + binary_packages.architecture,binary_packages.version, + sp.package, sp.version + FROM binary_packages, source_packages AS sp + WHERE sp.package = binary_packages.source + AND sp.release = binary_packages.release + AND sp.subrelease = binary_packages.subrelease + AND sp.version <> binary_packages.source_version"""): + relation = cmp(debian_support.Version(version), + debian_support.Version(source_version)) + assert relation <> 0 + if relation <= 0: + print "error: binary package is older than source package" + else: + print "warning: binary package is newer than source package" + print " binary package: %s (%s)" % (package, version) + print " source package: %s (%s)" % (source, source_version) + print " release:", release + if subrelease: + print " subrelease:", subrelease + print " architecture:", architecture + +def test(): + import os + + os.unlink(''test_security.db'') + db = DB(''test_security.db'') + db.initSchema() + + data_prefix = ''../../data/packages/sarge/'' + if False: + cursor = db.writeTxn() + db.updateSources(cursor, ''sarge'', '''', + debian_support.PackageFile(data_prefix + ''Sources'')) + db.updateSources(cursor, ''sarge'', ''security'', + debian_support.PackageFile(data_prefix + ''Sources.security'')) + db.updatePackages(cursor, ''sarge'', '''', ''i386'', + debian_support.PackageFile(data_prefix + ''Packages.i386'')) + db.updatePackages(cursor, ''sarge'', ''security'', ''i386'', + debian_support.PackageFile(data_prefix + + ''Packages.security-i386'')) + db.commit(cursor) + + # db.check(cursor) + + cursor = db.writeTxn() + db.deleteBugs(cursor) + db.insertBugs(cursor, bugs.CVEFile(''../../data/CAN/list'')) + db.insertBugs(cursor, bugs.CVEFile(''../../data/CVE/list'', + no_version_needs_note=False)) + db.insertBugs(cursor, bugs.DSAFile(''../../data/DSA/list'')) + db.insertBugs(cursor, bugs.DTSAFile(''../../data/DTSA/list'')) + db.finishBugs(cursor) + db.commit(cursor) + + b = bugs.BugFromDB(cursor, ''CAN-2005-2491'') + assert b.name == ''CAN-2005-2491'', b.name + assert b.description == ''(Integer overflow in pcre_compile.c in Perl Compatible Regular ...)'', b.description + assert len(b.xref) == 2, b.xref + assert not b.not_for_us + assert ''DSA-800-1'' in b.xref, b.xref + assert ''DTSA-10-1'' in b.xref, b.xref + assert tuple(b.comments) == ((''NOTE'', ''gnumeric/goffice includes one as well; according to upstream not exploitable in gnumeric,''), + (''NOTE'', ''new copy will be included any way'')),\ + b.comments + + assert len(b.notes) == 4, len(b.notes) + + for n in b.notes: + assert n.release is None + if n.package == ''pcre3'': + assert n.fixed_version == debian_support.Version(''6.3-0.1etch1'') + assert tuple(n.bugs) == (324531,), n.bugs + assert n.urgency == bugs.internUrgency(''medium'') + elif n.package == ''python2.1'': + assert n.fixed_version == debian_support.Version(''2.1.3dfsg-3'') + assert len(n.bugs) == 0, n.bugs + assert n.urgency == bugs.internUrgency(''medium'') + elif n.package == ''python2.2'': + assert n.fixed_version == debian_support.Version(''2.2.3dfsg-4'') + assert len(n.bugs) == 0, n.bugs + assert n.urgency == bugs.internUrgency(''medium'') + elif n.package == ''python2.3'': + assert n.fixed_version == debian_support.Version(''2.3.5-8'') + assert len(n.bugs) == 0, n.bugs + assert n.urgency == bugs.internUrgency(''medium'') + else: + assert False + +if __name__ == "__main__": + test() Property changes on: stamps ___________________________________________________________________ Name: svn:ignore + *