Author: fw
Date: 2005-12-15 11:37:40 +0000 (Thu, 15 Dec 2005)
New Revision: 3051
Modified:
bin/tracker_service.py
bin/update-db
lib/python/security_db.py
Log:
lib/python/security_db.py (DB):
Bump schema version.
(DB.initSchema):
Add debsecan_data table.
(DB.calculateDebsecan, DB.getDebsecan):
New methods.
bin/update-db:
Invoke calculateDebsecan.
bin/tracker_service.py (TrackerService):
Add support for debsecan/* pages.
(TrackerService.page_debsecan):
New method.
Modified: bin/tracker_service.py
==================================================================---
bin/tracker_service.py 2005-12-15 11:34:35 UTC (rev 3050)
+++ bin/tracker_service.py 2005-12-15 11:37:40 UTC (rev 3051)
@@ -98,6 +98,7 @@
self.register(''data/releases'',
self.page_data_releases)
self.register(''data/funny-versions'',
self.page_data_funny_versions)
self.register(''data/fake-names'',
self.page_data_fake_names)
+ self.register(''debsecan/**'', self.page_debsecan)
def page_home(self, path, params, url):
query = params.get(''query'',
('''',))[0]
@@ -854,6 +855,17 @@
make_table(gen(),
caption=("Bug", "Description"))])
+ def page_debsecan(self, path, params, url):
+ obj = ''/''.join(path)
+ data = self.db.getDebsecan(obj)
+ if data:
+ return BinaryResult(data)
+ else:
+ return self.create_page(
+ url, "Object not found",
+ [P("The requested debsecan object has not been
found.")],
+ status=404)
+
def create_page(self, url, title, body, search_in_page=False, status=200):
append = body.append
append(HR())
Modified: bin/update-db
==================================================================---
bin/update-db 2005-12-15 11:34:35 UTC (rev 3050)
+++ bin/update-db 2005-12-15 11:37:40 UTC (rev 3051)
@@ -75,6 +75,11 @@
print x
sys.exit(1)
+# debsecan data
+
+for release in ('''', ''woody'',
''sarge'', ''etch''):
+ db.calculateDebsecan(release)
+
# Everything worked well.
-
+
db.commit(cursor)
Modified: lib/python/security_db.py
==================================================================---
lib/python/security_db.py 2005-12-15 11:34:35 UTC (rev 3050)
+++ lib/python/security_db.py 2005-12-15 11:37:40 UTC (rev 3051)
@@ -27,6 +27,7 @@
"""
import apsw
+import base64
import bugs
import cPickle
import cStringIO
@@ -37,6 +38,7 @@
import re
import sys
import types
+import zlib
class InsertError(Exception):
"""Class for capturing insert errors.
@@ -111,7 +113,7 @@
self.db = apsw.Connection(name)
self.verbose = verbose
- self.schema_version = 17
+ self.schema_version = 18
self._initFunctions()
c = self.cursor()
@@ -309,6 +311,11 @@
loss_sec_prot_other INTEGER NOT NULL)""")
cursor.execute(
+ """CREATE TABLE debsecan_data
+ (name TEXT NOT NULL PRIMARY KEY,
+ data TEXT NOT NULL)""")
+
+ cursor.execute(
"""CREATE VIEW testing_status AS
SELECT DISTINCT sp.name AS package, st.bug_name AS bug,
sp.archive AS section, st.urgency AS urgency,
@@ -1219,6 +1226,133 @@
VALUES (?, ?, ?, ?)""",
(bug_name, suite, status, pkgs))
+ def calculateDebsecan(self, release):
+ """Create data for the debsecan tool."""
+
+ c = self.cursor()
+
+ c.execute("""CREATE TEMPORARY TABLE vulnlist (
+ name TEXT NOT NULL,
+ package TEXT NOT NULL,
+ note INTEGER NOT NULL,
+ PRIMARY KEY (name, package)
+ )""")
+
+ # Populate the table with the unstable vulnerabilities;
+ # override them with the release-specific status.
+
+ c.execute("""INSERT INTO vulnlist
+ SELECT bug_name, package, id FROM package_notes WHERE release =
''''""")
+
+ if release:
+ c.execute("""INSERT OR REPLACE INTO vulnlist
+ SELECT bug_name, package, id FROM package_notes
+ WHERE release = ?""", (release,))
+
+ c.execute("""DELETE FROM vulnlist WHERE name LIKE
''FAKE-0000000-%''""")
+
+ urgency_to_flag = {''low'' : ''L'',
''medium'' : ''M'', ''high'' :
''H'',
+ ''unknown'' : '' ''}
+
+ result = ["VERSION 0\n"]
+ for (name, package, fixed_version, kind, urgency, remote, description,
+ note_id) in list(c.execute("""SELECT
+ vulnlist.name, vulnlist.package,
+ COALESCE(n.fixed_version, ''''),
+ n.package_kind, n.urgency,
+ (SELECT range_remote FROM nvd_data
+ WHERE cve_name = vulnlist.name) AS remote,
+ bugs.description,
+ n.id
+ FROM vulnlist, bugs, package_notes AS n
+ WHERE bugs.name = vulnlist.name
+ AND n.id = vulnlist.note
+ ORDER BY vulnlist.package""")):
+ if fixed_version == ''0'' or urgency ==
''unimportant'' \
+ or kind not in (''source'',
''binary'', ''unknown''):
+ continue
+
+ # Normalize FAKE-* names a bit. The line number (which
+ # makes the name unique) is completely useless for the
+ # client.
+
+ if name[0:5] == "FAKE-":
+ name =
''-''.join(name.split(''-'')[0:2])
+
+ # Determine if a fix is available for the specific
+ # release.
+
+ fix_available = '' ''
+ if release:
+ fix_available = '' ''
+ if kind == ''source'':
+ fix_available_sql = """SELECT st.vulnerable
+ FROM source_packages AS p, source_package_status AS st
+ WHERE p.name = ?
+ AND p.release = ?
+ AND p.subrelease IN ('''',
''security'')
+ AND st.bug_name = ?
+ AND st.package = p.rowid
+ ORDER BY p.version COLLATE version
DESC"""
+ elif kind == ''binary'':
+ fix_available_sql = """SELECT st.vulnerable
+ FROM binary_packages AS p, binary_package_status AS st
+ WHERE p.name = ?
+ AND p.release = ?
+ AND p.subrelease IN ('''',
''security'')
+ AND st.bug_name = ?
+ AND st.package = p.rowid
+ ORDER BY p.version COLLATE version
DESC"""
+ else:
+ fix_available_sql = ''''
+
+ if fix_available_sql:
+ for (v,) in c.execute(fix_available_sql,
+ (package, release, name)):
+ assert v is not None
+ if not v:
+ fix_available = ''F''
+ break
+ elif fixed_version <> '''':
+ fix_available = ''F''
+
+ if kind == ''source'':
+ kind = ''S''
+ elif kind == ''binary'':
+ kind = ''B''
+ else:
+ kind = '' ''
+
+ if remote is None:
+ remote = ''?''
+ elif remote:
+ remote = ''R''
+ else:
+ remote = '' ''
+
+ result.append("%s,%c%c%c%c,%s,%s,%s\n"
+ % (name,
+ kind, urgency_to_flag[urgency], remote,
+ fix_available,
+ package, fixed_version, description))
+ result =
base64.encodestring(zlib.compress(''''.join(result), 9))
+
+ if not release:
+ release = ''sid''
+ c.execute(
+ "INSERT OR REPLACE INTO debsecan_data (name, data) VALUES (?,
?)",
+ (''release/'' + release, result))
+
+ c.execute("DROP TABLE vulnlist")
+
+ def getDebsecan(self, name):
+ """Returns the debsecan data item
NAME."""
+ for (data,) in self.cursor().execute(
+ "SELECT data FROM debsecan_data WHERE name = ?",
(name,)):
+ return base64.decodestring(data)
+ else:
+ return None
+
def replaceNVD(self, cursor, data):
"""Replaces the stored NVD data."""
cursor.execute("DELETE FROM nvd_data");