Author: fw Date: 2005-10-20 09:02:12 +0000 (Thu, 20 Oct 2005) New Revision: 2482 Added: bin/tracker_service.py lib/python/web_support.py Log: r614@deneb: fw | 2005-10-13 22:12:28 +0200 Add new web front end. bin/tracker_service.py, lib/python/web_support.py: New files. Added: bin/tracker_service.py ==================================================================--- bin/tracker_service.py 2005-10-20 09:01:57 UTC (rev 2481) +++ bin/tracker_service.py 2005-10-20 09:02:12 UTC (rev 2482) @@ -0,0 +1,816 @@ +#!/usr/bin/python + +import sys +sys.path.insert(0,''../lib/python'') + +if len(sys.argv) <> 3: + print "usage: python tracker_serivce.py SOCKET-PATH DATABASE-PATH" + sys.exit(1) +socket_name = sys.argv[1] +db_name = sys.argv[2] + +import bugs +import re +import security_db +from web_support import * + +class TrackerService(WebService): + head_contents = compose(STYLE( + """h1 { font-size : 144%; } +h2 { font-size : 120%; } +h3 { font-size : 100%; } + +table { padding-left : 1.5em } +td, th { text-align : left; + padding-left : 0.25em; + padding-right : 0.25em; } +td { vertical-align: baseline } +span.red { color: red; } +span.dangerous { color: rgb(191,127,0); } +"""), SCRIPT(''''''var old_query_value = ""; + +function selectSearch() { + document.searchForm.query.focus(); +} + +function onSearch(query) { + if (old_query_value == "") { + if (query.length > 5) { + old_query_value = query; + document.searchForm.submit(); + } else { + old_query_value = query; + } + } +} +'''''')).toHTML() + + def __init__(self, socket_name, db_name): + WebService.__init__(self, socket_name) + self.db = security_db.DB(db_name) + self.register('''', self.page_home) + self.register(''*'', self.page_object) + self.register(''source-package/*'', self.page_source_package) + self.register(''binary-package/*'', self.page_binary_package) + self.register(''status/release/stable'', self.page_status_release_stable) + self.register(''status/release/testing'', + self.page_status_release_testing) + self.register(''status/release/unstable'', + self.page_status_release_unstable) + self.register(''status/dtsa-candidates'', + self.page_status_dtsa_candidates) + self.register(''status/todo'', self.page_status_todo) + self.register(''status/itp'', self.page_status_itp) + self.register(''data/unknown-packages'', self.page_data_unknown_packages) + self.register(''data/missing-epochs'', self.page_data_missing_epochs) + self.register(''data/releases'', self.page_data_releases) + self.register(''data/funny-versions'', self.page_data_funny_versions) + + def page_home(self, path, params, url): + query = params.get(''query'', ('''',))[0] + if query: + if ''/'' in query: + return self.page_not_found(url, query) + else: + return RedirectResult(url.scriptRelativeFull(query)) + + return self.create_page( + url, ''Security Bug Tracker'', + [P( + """This is the experimental issue tracker for Debian''s testing +security team. Keep in mind that this is merely a prototype. +Please report any problems to """, + A("mailto:fw@deneb.enyo.de", "Florian Weimer"), + """.Note that some of the data presented here is known +to be wrong (see below), but the data for the testing suite +should be fine."""), + make_menu( + url.scriptRelative, + (''status/release/stable'', + ''Vulnerable packages in the stable suite''), + (''status/release/testing'', + ''Vulnerable packages in the testing suite''), + (''status/release/unstable'', + ''Vulnerable packages in the unstable suite''), + (''status/dtsa-candidates'', "Candidates for DTSAs"), + (''status/todo'', ''TODO items''), + (''status/itp'', ''ITPs with potential security issues''), + (''data/unknown-packages'', + ''Packages names not found in the archive''), + (''data/missing-epochs'', + ''Package versions which might lack an epoch''), + (''data/funny-versions'', + ''Packages with strange version numbers''), + (''data/releases'', + ''Covered Debian releases and architectures (slow)''), + self.make_search_button(url)), + + H2("A few notes on data sources"), + P("""Data in this tracker comes solely from the bug database +which is maintained by Debian''s testing security team in their +Subversion repository. All external data (this includes +Debian bug reports and official Debian security advisories) +must be added to this database before it appears here, and there +can be some delay before this happens."""), + P("""At the moment, the database only contains information which is +relevant for tracking the security status of the stable, testing and +unstable suites. This means that data for oldstable is likely wrong.""")], + search_in_page=True) + + def page_object(self, path, params, url): + obj = path[0] + + if not obj: + # Redirect to start page. + return RedirectResult(url.scriptRelativeFull("")) + + if ''A'' <= obj[0] <= ''Z'': + # Bug names start with a capital letter. + return self.page_bug(url, obj) + + bugnumber = 0 + try: + bugnumber = int(obj) + except ValueError: + pass + if bugnumber: + return self.page_debian_bug(url, bugnumber) + + c = self.db.cursor() + if self.db.isSourcePackage(c, obj): + return RedirectResult(self.url_source_package(url, obj, full=True)) + if self.db.isBinaryPackage(c, obj): + return RedirectResult(self.url_binary_package(url ,obj, full=True)) + + return self.page_not_found(url, obj) + + def page_bug(self, url, name): + cursor = self.db.cursor() + try: + bug = bugs.BugFromDB(cursor, name) + except ValueError: + return self.page_not_found(url, name) + if bug.name <> name: + # Show the normalized bug name in the browser address bar. + return RedirectResult(url.scriptRelativeFull(bug.name)) + + page = [] + + def gen_header(): + yield B("Name"), bug.name + + source = bug.name.split(''-'')[0] + if source in (''CAN'', ''CVE''): + source_xref = self.make_cve_ref(url, bug.name, ''CVE'') + elif source == ''DSA'': + source_xref = self.make_dsa_ref(url, bug.name, ''Debian'') + elif source == ''DTSA'': + source_xref = ''Debian Testing Security Team'' + elif source == ''FAKE'': + source_xref = ( + ''Automatically generated temporary name. Not for external reference.'') + else: + source_xref = None + + if source_xref: + yield B("Source"), source_xref + + if bug.description: + yield B("Description"), bug.description + + xref = list(self.db.getBugXrefs(cursor, bug.name)) + if xref: + yield B("References"), self.make_xref_list(url, xref) + + debian_bugs = bug.getDebianBugs(cursor) + if debian_bugs: + yield (B("Debian Bugs"), + self.make_debian_bug_list(url, debian_bugs)) + + if not bug.not_for_us: + for (release, status, reason) in bug.getStatus(cursor): + if status <> ''fixed'': + reason = self.make_red(reason) + yield B(''Status of %s'' % release), reason + + page.append(make_table(gen_header())) + + if bug.notes: + page.append(H2("Vulnerable and fixed packages")) + + def gen_source(): + old_pkg = '''' + for (package, release, version, vulnerable) \ + in self.db.getSourcePackages(cursor, bug.name): + if package == old_pkg: + package = '''' + else: + old_pkg = package + package = compose( + self.make_source_package_ref(url, package), + " (", self.make_pts_ref(url, package, ''PTS''), ")") + if vulnerable: + vuln = self.make_red(''vulnerable'') + version = self.make_red(version) + else: + vuln = ''fixed'' + + yield package, '', ''.join(release), version, vuln + + page.append(make_table(gen_source(), + caption=("Source Package", "Release", "Version", "Status"), + introduction=P(''The table below lists information on source packages.''))) + + def gen_binary(): + old_pkg = '''' + for (packages, releases, version, archs, vulnerable) \ + in self.db.getBinaryPackages(cursor, bug.name): + pkg = '', ''.join(packages) + if pkg == old_pkg: + packages = '''' + else: + old_pkg = pkg + packages = self.make_binary_packages_ref(url, packages) + + if vulnerable: + vuln = self.make_red(''vulnerable'') + version = self.make_red(version) + else: + vuln = ''fixed'' + yield (packages, + '', ''.join(releases), + version, vuln, + '', ''.join(archs)) + + page.append(make_table(gen_binary(), + caption=("Binary Package", "Release", "Version", "Status", + "Architecures"), + introduction=P("The next table lists affected binary packages."))) + + def gen_data(): + notes_sorted = bug.notes[:] + notes_sorted.sort(lambda a, b: cmp(a.package, b.package)) + for n in notes_sorted: + if n.release: + rel = str(n.release) + else: + rel = ''(unstable)'' + urgency = str(n.urgency) + if n.fixed_version: + ver = str(n.fixed_version) + if ver == ''0'': + ver = ''(not affected)'' + urgency = '''' + else: + ver = self.make_red(''(unfixed)'') + + pkg = n.package + pkg_kind = n.package_kind + if pkg_kind == ''source'': + pkg = self.make_source_package_ref(url, pkg) + elif pkg_kind == ''binary'': + pkg = self.make_binary_package_ref(url, pkg) + elif pkg_kind == ''itp'': + pkg_kind = ''ITP'' + rel = '''' + ver = '''' + urgency = '''' + + bugs = n.bugs + bugs.sort() + bugs = make_list( + map(lambda x: self.make_debian_bug(url, x), bugs)) + if n.bug_origin: + origin = self.make_xref(url, n.bug_origin) + else: + origin = '''' + yield (pkg, pkg_kind, rel, ver, urgency, origin, bugs) + + page.append( + make_table(gen_data(), + caption=("Package", "Type", "Release", "Fixed Version", + "Urgency", "Origin", "Debian Bugs"), + introduction=P("The information above is based on the following data on fixed versions."))) + + if bug.comments: + page.append(H2("Notes")) + def gen_comments(): + for (t, c) in bug.comments: + yield c + page.append(make_pre(gen_comments())) + + return self.create_page(url, bug.name, page) + + def page_debian_bug(self, url, bugnumber): + buglist = list(self.db.getBugsFromDebianBug(self.db.cursor(), + bugnumber)) + if buglist: + if len(buglist) == 1: + # Single issue, redirect. + return RedirectResult(url.scriptRelativeFull(buglist[0][0])) + + def gen(): + for (name, urgency, description) in buglist: + if urgency == "unknown": + urgency = "" + yield self.make_xref(url, name), urgency, description + + return self.create_page( + url, "Information related to Debian bug #%d" % bugnumber, + [P("The following issues reference to Debian bug ", + self.make_debian_bug(url, bugnumber), ":"), + make_table(gen(), + caption=("Name", "Urgency", "Description"))]) + + else: + return self.page_not_found(url, str(bugnumber)) + + def page_not_found(self, url, query): + return self.create_page(url, ''Not found'', + [P(''Your query '', + CODE(query), + '' matched no results.'')], + status=404) + + def page_source_package(self, path, params, url): + pkg = path[0] + + def gen_versions(): + for (releases, version) in self.db.getSourcePackageVersions( + self.db.cursor(), pkg): + yield '', ''.join(releases), version + def gen_binary(): + for (packages, releases, archs, version) \ + in self.db.getBinaryPackagesForSource( + self.db.cursor(), pkg): + yield (self.make_binary_packages_ref(url, packages), + '', ''.join(releases), version, '', ''.join(archs)) + def gen_bug_list(lst): + for (bug, description) in lst: + yield self.make_xref(url, bug), description + + return self.create_page( + url, "Information on source package " + pkg, + [make_menu(lambda x: x, + (self.url_pts(url, pkg), + pkg + '' in the Package Tracking System''), + (self.url_debian_bug_pkg(url, pkg), + pkg + '' in the Bug Tracking System''), + (self.url_testing_status(url, pkg), + pkg + '' in the testing migration checker'')), + H2("Available versions"), + make_table(gen_versions(), caption=("Release", "Version")), + + H2("Available binary packages"), + make_table(gen_binary(), + caption=(''Package'', ''Release'', ''Version'', ''Architectures''), + replacement="""No binary packages are recorded in this database. +This probably means that the package is architecture-specific, and the +architecture is currently not tracked."""), + + H2("Open issues"), + make_table(gen_bug_list(self.db.getBugsForSourcePackage + (self.db.cursor(), pkg, True)), + caption=(''Bug'', ''Description''), + replacement=''No known open issues.''), + + H2("Resolved issues"), + make_table(gen_bug_list(self.db.getBugsForSourcePackage + (self.db.cursor(), pkg, False)), + caption=(''Bug'', ''Description''), + replacement=''No known resolved issues.'')]) + + def page_binary_package(self, path, params, url): + pkg = path[0] + + def gen_versions(): + for (releases, source, version, archs) \ + in self.db.getBinaryPackageVersions(self.db.cursor(), pkg): + yield ('', ''.join(releases), + self.make_source_package_ref(url, source), + version, '', ''.join(archs)) + def gen_bug_list(lst): + for (bug, description) in lst: + yield self.make_xref(url, bug), description + + return self.create_page( + url, "Information on binary package " + pkg, + [make_menu(lambda x: x, + (self.url_debian_bug_pkg(url, pkg), + pkg + '' in the Bug Tracking System'')), + H2("Available versions"), + make_table(gen_versions(), + caption=("Release", "Source", "Version", "Architectures")), + + H2("Open issues"), + make_table(gen_bug_list(self.db.getBugsForBinaryPackage + (self.db.cursor(), pkg, True)), + caption=(''Bug'', ''Description''), + replacement=''No known open issues.''), + + H2("Resolved issues"), + make_table(gen_bug_list(self.db.getBugsForBinaryPackage + (self.db.cursor(), pkg, False)), + caption=(''Bug'', ''Description''), + replacement=''No known resolved issues.''), + + H2("Non-issues"), + make_table(gen_bug_list(self.db.getNonBugsForBinaryPackage + (self.db.cursor(), pkg)), + caption=(''Bug'', ''Description''), + replacement="""No known issues which do not affect +this package, but still reference it.""")]) + + def page_status_release_stable(self, path, params, url): + def gen(): + old_pkg_name = '''' + for (pkg_name, bug_name, archive, urgency) in \ + self.db.cursor().execute( + """SELECT package, bug, section, urgency FROM stable_status"""): + if pkg_name == old_pkg_name: + pkg_name = '''' + else: + old_pkg_name = pkg_name + if archive <> ''main'': + pkg_name = "%s (%s)" % (pkg_name, archive) + + if urgency == ''unknown'': + urgency = '''' + elif urgency == ''high'': + urgency = self.make_red(urgency) + + yield pkg_name, self.make_xref(url, bug_name), urgency + + return self.create_page( + url, ''Vulnerable source packages in the stable suite'', + [make_table(gen(), caption=("Package", "Bug", "Urgency"))]) + + def page_status_release_testing(self, path, params, url): + def gen(): + old_pkg_name = '''' + for (pkg_name, bug_name, archive, urgency, + sid_vulnerable, ts_fixed) in self.db.cursor().execute( + """SELECT package, bug, section, urgency, unstable_vulnerable, + testing_security_fixed + FROM testing_status"""): + if pkg_name == old_pkg_name: + pkg_name = '''' + else: + old_pkg_name = pkg_name + if archive <> ''main'': + pkg_name = "%s (%s)" % (pkg_name, archive) + + if ts_fixed: + status = ''fixed in testing-security'' + else: + if sid_vulnerable: + status = self.make_red(''unstable is vulnerable'') + else: + status = self.make_dangerous(''fixed in unstable'') + + if urgency == ''unknown'': + urgency = '''' + + yield (pkg_name, self.make_xref(url, bug_name), + urgency, status) + + return self.create_page( + url, ''Vulnerable source packages in the testing suite'', + [make_menu(url.scriptRelative, + ("status/dtsa-candidates", "Candidates for DTSAs")), + make_table(gen(), caption=("Package", "Bug"))]) + + def page_status_release_unstable(self, path, params, url): + def gen(): + old_pkg_name = '''' + for (pkg_name, bug_name, section, urgency) \ + in self.db.cursor().execute( + """SELECT DISTINCT sp.name, st.bug_name, + sp.archive, st.urgency + FROM source_package_status AS st, source_packages AS sp + WHERE st.vulnerable AND st.urgency <> ''unimportant'' + AND sp.rowid = st.package AND sp.release = ''sid'' + AND sp.subrelease = '''' + ORDER BY sp.name, st.bug_name"""): + if pkg_name == old_pkg_name: + pkg_name = '''' + else: + old_pkg_name = pkg_name + if section <> ''main'': + pkg_name = "%s (%s)" % (pkg_name, section) + else: + pkg_name = self.make_xref(url, pkg_name) + + if urgency == ''unknown'': + urgency = '''' + elif urgency == ''high'': + urgency = self.make_red(urgency) + + yield pkg_name, self.make_xref(url, bug_name), urgency + + + return self.create_page( + url, ''Vulnerable source packages in the testing suite'', + [P("""Note that the list below is based on source packages. + This means that packages are not listed here once a new, + fixed source version has been uploaded to the archive, even + if there are still some vulnerably binary packages present + in the archive."""), + make_table(gen(), caption=(''Package'', ''Bug'', ''Urgency''))]) + + def page_status_dtsa_candidates(self, path, params, url): + def gen(): + old_pkg_name = '''' + for (pkg_name, bug_name, archive, urgency, stable_later) \ + in self.db.cursor().execute( + """SELECT package, bug, section, urgency, + (SELECT testing.version_id < stable.version_id + FROM source_packages AS testing, source_packages AS stable + WHERE testing.name = testing_status.package + AND testing.release = ''etch'' + AND testing.subrelease = '''' + AND testing.archive = testing_status.section + AND stable.name = testing_status.package + AND stable.release = ''sarge'' + AND stable.subrelease = ''security'' + AND stable.archive = testing_status.section) + FROM testing_status + WHERE (NOT unstable_vulnerable) + AND (NOT testing_security_fixed)"""): + if pkg_name == old_pkg_name: + pkg_name = '''' + migration = '''' + else: + old_pkg_name = pkg_name + migration = A(self.url_testing_status(url, pkg_name), + "check") + if archive <> ''main'': + pkg_name = "%s (%s)" % (pkg_name, archive) + else: + pkg_name = self.make_source_package_ref(url, pkg_name) + + if urgency == ''unknown'': + urgency = '''' + elif urgency == ''high'': + urgency = self.make_red(urgency) + + if stable_later: + notes = "(fixed in stable?)" + else: + notes = '''' + + yield (pkg_name, migration, self.make_xref(url, bug_name), + urgency, notes) + + return self.create_page( + url, "Candidates for DTSAs", + [P("""The table below lists packages which are fixed +in unstable, but unfixed in testing. Use the testing migration +return web_supporttracker to find out why they have not entered +return web_supporttesting yet."""), + make_menu(url.scriptRelative, + ("status/release/testing", + "List of vulnerable packages in testing")), + make_table(gen(), + caption=("Package", "Migration", "Bug", "Urgency"))]) + + def page_status_todo(self, path, params, url): + def gen(): + for (bug, description) in self.db.getTODOs(): + yield self.make_xref(url, bug), description + return self.create_page( + url, "Bugs with TODO items", + [make_table(gen(), + caption=("Bug", "Description"))]) + + def page_status_itp(self, path, params, url): + def gen(): + old_pkg = '''' + for pkg, bugs, debian_bugs in self.db.getITPs(self.db.cursor()): + if pkg == old_pkg: + pkg = '''' + else: + old_pkg = pkg + yield (pkg, self.make_xref_list(url, bugs), + self.make_debian_bug_list(url, debian_bugs)) + return self.create_page( + url, "ITPs with potential security issues", + [make_table(gen(), caption=("Package", "Issue", "Debian Bugs"), + replacement="No ITP bugs are currently known.")]) + + def page_data_unknown_packages(self, path, params, url): + def gen(): + for name, bugs in self.db.getUnknownPackages(self.db.cursor()): + yield name, self.make_xref_list(url, bugs) + return self.create_page( + url, "Unknown packages", + [P("""Sometimes, a package referenced in a bug report +cannot be found in the database. This can be the result of a spelling +return web_supporterror, or a historic entry refers to a +return web_supportpackage which is no longer in the archive."""), + make_table(gen(), caption=("Package", "Bugs"), + replacement="No unknown packages are referenced in the database.")]) + + def page_data_missing_epochs(self, path, params, url): + def gen(): + old_bug = '''' + old_pkg = '''' + for bug, pkg, ver1, ver2 in self.db.cursor().execute( + """SELECT DISTINCT bug_name, n.package, + n.fixed_version, sp.version + FROM package_notes AS n, source_packages AS sp + WHERE n.package_kind = ''source'' + AND n.fixed_version NOT LIKE ''%:%'' + AND n.fixed_version <> ''0'' + AND n.bug_origin = '''' + AND sp.name = n.package + AND sp.version LIKE ''%:%'' + ORDER BY bug_name, package"""): + if bug == old_bug: + bug = '''' + else: + old_bug = bug + old_pkg = '''' + bug = self.make_xref(url, bug) + if pkg == old_pkg: + pkg = '''' + else: + old_pkg = pkg + pkg = self.make_source_package_ref(url, pkg) + yield bug, pkg, ver1, ver2 + + return self.create_page( + url, "Missing epochs in package versions", + [make_table(gen(), + caption=("Bug", "Package", "Version 1", "Version 2"), + replacement="No source package version with missing epochs.")]) + + def page_data_releases(self, path, params, url): + def gen(): + for (rel, subrel, archive, sources, archs) \ + in self.db.availableReleases(): + if sources: + sources = ''yes'' + else: + sources = ''no'' + yield rel, subrel, archive, sources, make_list(archs) + return self.create_page( + url, "Available releases", + [P("""The security issue database is checked against +the Debian releases listed in the table below."""), + make_table(gen(), + caption=("Release", "Subrelease", "Archive", + "Sources", "Architectures"))]) + + def page_data_funny_versions(self, path, params, url): + def gen(): + for name, release, archive, version, source_version \ + in self.db.getFunnyPackageVersions(): + yield name, release, archive, source_version, version + + return self.create_page( + url, "Version conflicts between source/binary packages", + [P("""The table below lists source packages + which have a binary package of the same name, but with a different + version. This means that extra care is necessary to determine + the version of a package which has been fixed. (Note that + the bug tracker prefers source versions to binary versions + in this case.)"""), + make_table(gen(), + caption=("Package", + "Release", + "Archive", + "Source Version", + "Binary Version")), + P("""Technically speaking, these version numbering is fine, +but it makes version-based bug tracking quite difficult for these packages."""), + P("""There are many binary packages which are built from source + packages with different version numbering schemes. However, as + long as none of the binary packages carries the same name as the + source package, most confusion is avoided or can be easily + explained.""")]) + + + def create_page(self, url, title, body, search_in_page=False, status=200): + append = body.append + append(HR()) + if not search_in_page: + append(self.make_search_button(url)) + append(P(A(url.scriptRelative(""), "Home"), + " - ", A(url.absolute("http://secure-testing.debian.net/"), + "Testing Security Team"), + " - ", A(url.absolute("http://www.debian.org/security/"), + "Debian Security"), + " - ", A(url.absolute + ("http://www.enyo.de/fw/impressum.html"), + "Imprint"))) + if search_in_page: + on_load = "selectSearch()" + else: + on_load = None + return HTMLResult(self.add_title(title, body, + head_contents=self.head_contents, + body_attribs={''onload'': on_load}), + doctype=self.html_dtd(), + status=status) + + def make_search_button(self, url): + return FORM("Search for package or bug name: ", + INPUT(type=''text'', name=''query'', + onkeyup="onSearch(this.value)", + onmousemove="onSearch(this.value)"), + INPUT(type=''submit'', value=''Go''), + method=''get'', + action=url.scriptRelative('''')) + + def url_cve(self, url, name): + return url.absolute("http://cve.mitre.org/cgi-bin/cvename.cgi", + name=name) + def url_dsa(self, url, dsa, re_dsa=re.compile(r''^DSA-(\d+)(?:-\d+)?$'')): + match = re_dsa.match(dsa) + if match: + # We must determine the year because there is no generic URL. + (number,) = match.groups() + for (date,) in self.db.cursor().execute( + "SELECT release_date FROM bugs WHERE name = ?", (dsa,)): + (y, m, d) = date.split(''-'') + return url.absolute("http://www.debian.org/security/%d/dsa-%d" + % (int(y), int(number))) + return None + + def url_debian_bug(self, url, debian): + return url.absolute("http://bugs.debian.org/cgi-bin/bugreport.cgi", + bug=str(debian)) + def url_debian_bug_pkg(self, url, debian): + return url.absolute("http://bugs.debian.org/cgi-bin/pkgreport.cgi", + pkg=debian) + def url_pts(self, url, package): + return url.absolute("http://packages.qa.debian.org/common/index.html", + src=package) + def url_testing_status(self, url, package): + return url.absolute("http://bjorn.haxx.se/debian/testing.pl", + package=package) + def url_source_package(self, url, package, full=False): + if full: + return url.scriptRelativeFull("source-package/" + package) + else: + return url.scriptRelative("source-package/" + package) + def url_binary_package(self, url, package, full=False): + if full: + return url.scriptRelativeFull("binary-package/" + package) + else: + return url.scriptRelative("binary-package/" + package) + + def make_xref(self, url, name): + return A(url.scriptRelative(name), name) + + def make_xref_list(self, url, lst, separator='', ''): + return make_list(map(lambda x: self.make_xref(url, x), lst), separator) + + def make_debian_bug(self, url, debian): + return A(self.url_debian_bug(url, debian), str(debian)) + def make_debian_bug_list(self, url, lst): + return make_list(map(lambda x: self.make_debian_bug(url, x), lst)) + + def make_cve_ref(self, url, cve, name=None): + if name is None: + name = cve + return A(self.url_cve(url, cve), name) + + def make_dsa_ref(self, url, dsa, name=None): + if name is None: + name = dsa + u = self.url_dsa(url, dsa) + if u: + return A(u, name) + else: + return name + + def make_pts_ref(self, url, pkg, name=None): + if name is None: + name = pkg + return A(self.url_pts(url, pkg), name) + + def make_source_package_ref(self, url, pkg, title=None): + if title is None: + title = pkg + return A(self.url_source_package(url, pkg), title) + def make_binary_package_ref(self, url, pkg, title=None): + if title is None: + title = pkg + return A(self.url_binary_package(url, pkg), title) + def make_binary_packages_ref(self, url, lst): + assert type(lst) <> types.StringType + return make_list(map(lambda x: self.make_binary_package_ref(url, x), + lst)) + + def make_red(self, contents): + return SPAN(contents, _class="red") + + def make_dangerous(self, contents): + return SPAN(contents, _class="dangerous") + + def pre_dispatch(self): + self.db.refresh() + +TrackerService(socket_name, db_name).run() Property changes on: bin/tracker_service.py ___________________________________________________________________ Name: svn:mime-type + text/script Added: lib/python/web_support.py ==================================================================--- lib/python/web_support.py 2005-10-20 09:01:57 UTC (rev 2481) +++ lib/python/web_support.py 2005-10-20 09:02:12 UTC (rev 2482) @@ -0,0 +1,692 @@ +# web_support.py -- simple HTTP generation framework +# Copyright (C) 2005 Florian Weimer <fw@deneb.enyo.de> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +import cgi +import cStringIO +import os +import re +import socket +import struct +import sys +import grp +import traceback +import types +import urllib + +class ServinvokeError(Exception): + pass + +class Service: + """A class for service objects. + + Service objects are contacted by the program servinvoke and + process HTTP requests in a serialized fashion. (Only the data + transfer from and to the client happens in parallel, and this is + handled by the servinvoke program.) + + If the newly created socket is owned by the www-data group, it is + automatically made readable by that group. + """ + + def __init__(self, socket_name): + self.socket_name = socket_name + self._unlinkSocket() + self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) + self.socket.bind(self.socket_name) + self.socket.listen(5) + self._chmod() + + def __del__(self): + self._unlinkSocket() + + def _unlinkSocket(self): + try: + os.unlink(self.socket_name) + except OSError: + pass + + def _chmod(self): + gid = os.stat(self.socket_name).st_gid + grpent = grp.getgrgid(gid) + if grpent[0] == ''www-data'': + os.chmod(self.socket_name, 0660) + + def log(self, msg, *args): + sys.stderr.write((msg % args) + "\n") + + def run(self): + while 1: + (client, addr) = self.socket.accept() + + def read(count): + data = '''' + cnt = 0 + while cnt <> count: + d = client.recv(count - cnt) + if d: + data += d + cnt = len(data) + else: + self.log("unexpected end of data from servinvoke") + raise ServinvokeError() + + return data + + try: + header = read(24) + (magic, version, cli_size, cli_count, env_size, env_count) = \ + struct.unpack("!6I", header) + if magic <> 0x15fd34df: + sys.log("unknown magic number %08X", magic) + if version <> 1: + sys.log("unknown version %08X", magic) + cli = read(cli_size).split(''\0'')[:-1] + env = {} + for x in read(env_size).split(''\0'')[:-1]: + (key, value) = x.split(''='', 1) + env[key] = value + data = [] + while 1: + d = client.recv(4096) + if d: + data.append(d) + else: + break + data = ''''.join(data) + result = cStringIO.StringIO() + self.handle(cli, env, data, result) + client.sendall(result.getvalue()) + client.close() + + except ServinvokeError: + client.close() + pass + except KeyboardInterrupt: + client.close() + raise + except: + client.close() + target = cStringIO.StringIO() + traceback.print_exc(None, target) + self.log("%s", target.getvalue()) + + def handle(args, environ, data): + """Invoke by run to handle a single request. Should + return the data to be sent back to the client.""" + return "" + +class URL: + """A simple wrapper class for strings which are interpreted as URLs.""" + def __init__(self, url): + self.__url = url + def __str__(self): + return self.__url + def __repr__(self): + return "URL(%s)" % `self.__url` + +class URLFactory: + """Creates URL objects. + + This factory class handles the case where a script wants to + generate URLs which reference to itself (see scriptRelative).""" + + def __init__(self, server_name, script_name): + self.server_name = server_name or ''localhost'' + script_name = self._stripSlashes(script_name or '''') + if script_name[-1:] == ''/'' or script_name == '''': + self.script_name = script_name + else: + self.script_name = script_name + ''/'' + + def _convertArgs(self, args): + arglist = [] + for (key, value) in args.items(): + if value is None: + continue + arglist.append("%s=%s" % (urllib.quote(key), + urllib.quote(value))) + if arglist: + return "?" + ''&''.join(arglist) + else: + return "" + def _stripSlashes(self, arg): + while arg[:1] == ''/'': + arg = arg[1:] + return arg + + def absolute(self, url, **args): + """Creates an absolute URL, with optional arguments to pass.""" + + return URL(url + self._convertArgs(args)) + + def scriptRelative(self, path, **args): + """Returns a URL which references to the path relative to the + current script. Optionally, arguments to pass can be included.""" + + return URL("/%s%s%s" % (self.script_name, + self._stripSlashes(path), + self._convertArgs(args))) + + def scriptRelativeFull(self, path, **args): + """Like scriptRelative, but returns an absolute URL, including + the http:// prefix.""" + + return URL("http://%s/%s%s%s" % (self.server_name, self.script_name, + self._stripSlashes(path), + self._convertArgs(args))) + + +stringToHTML = map(chr, range(256)) +def _initStringToHTML(s): + for (ch, repl) in ((''<'', ''<''), + (''>'', ''>''), + (''&'', ''&''), + (''"'', ''"'')): + s[ord(ch)] = repl +_initStringToHTML(stringToHTML) +del _initStringToHTML + +def escapeHTML(str): + ''''''Replaces the characters <>&" in the passed strings with their + HTML entities.'''''' + + result = [] + append = result.append + for ch in str: + append(stringToHTML[ord(ch)]) + return ''''.join(result) + +class HTMLBase: + def flatten(self, write): + """Invokes write repeatedly, for the tag and its contents. + + Note that typically, a lot of very short strings a written, so + it''s better to add some buffering before sending the strings + elsewhere.""" + pass + + def toString(self): + """Invokes flatten to create a new string object.""" + r = cStringIO.StringIO() + self.flatten(r.write) + return r.getvalue() + + def toHTML(self): + return VerbatimHTML(self.toString()) + +class VerbatimHTML(HTMLBase): + """Creates verbatim HTML from a string object. Mainly used for + optimizing recurring HTML snippets.""" + + def __init__(self, contents): + self.__contents = contents + + def flatten(self, write): + write(self.__contents) + +class Compose(HTMLBase): + """Glues a sequence of HTML snippets together, without enclosing it in + a tag.""" + def __init__(self, contents): + self.__contents = contents + + def flatten(self, write): + for x in self.__contents: + if type(x) == types.StringType: + write(escapeHTML(x)) + else: + x.flatten(write) + +def compose(*contents): + """Concatenates several HTML objects.""" + return Compose(contents) + + +class Tag(HTMLBase): + """Base class for HTML tags.""" + + re_name = re.compile(r''\A_?[a-zA-Z][a-zA-Z0-9]*\Z'') + + def __init__(self, name, contents, attribs={}): + self._check(name) + self.__name = name + attrs = [] + append = attrs.append + for (key, value) in attribs.items(): + if value is None: + continue + self._check(key) + append('' '') + if key[0] == ''_'': + append(key[1:]) + else: + append(key) + append(''="'') + for ch in str(value): + append(stringToHTML[ord(ch)]) + append(''"'') + self.__attribs = ''''.join(attrs) + self.contents = contents + + def _check(self, name): + if self.re_name.match(name): + return + else: + raise ValueError, "invalid name: " + `name` + + def flatten(self, write): + if self.contents: + write("<%s%s>" % (self.__name, self.__attribs)) + closing = "</%s>" % self.__name + try: + for x in self.contents: + if type(x) == types.StringType: + write(escapeHTML(x)) + else: + x.flatten(write) + except: + # If we encountered any exception, try to write the + # closing tag nevertheless. This increases our + # chances that we produce valid XML. + try: + write(closing) + except: + pass + raise + write(closing) + + else: + write("<%s%s/>" % (self.__name, self.__attribs)) + + def __repr__(self): + return "<websupport.Tag instance, name=%s>" % `self.__name` + +def tag(__name, __contents, **__attribs): + """Creates a new tag object. + + name - name of the tag + contents - a sequence objet (or iterator) for the enclosed contents + attribs - keyword arguments froming forming attributes + """ + return Tag(__name, __contents, __attribs) + +def emptyTag(__name, **__attribs): + """A tag without contents. + + name - name of the tag + attribs - keyword arguments froming forming attributes + """ + return Tag(__name, None, __attribs) + +def A(url, text=None): + if text is None: + text = url + return tag(''a'', text, href=str(url)) +def STYLE(contents, type=''text/css''): + return tag(''style'', contents, type=type) +def SCRIPT(contents, type="text/javascript", language="JavaScript"): + return tag(''script'', contents, type=type, language=language) +def TITLE(contents): + return tag(''title'', contents) +def HTML(head, body): + return tag(''html'', (HEAD(head), BODY(body))) +def HEAD(contents): + return tag(''head'', contents) +def BODY(contents, onload=None): + return tag(''body'', contents, onload=onload) +def H1(contents): + return tag(''h1'', contents) +def H2(contents): + return tag(''h2'', contents) +def P(*contents): + return Tag(''p'', contents) +def SPAN(*__contents, **__attribs): + return Tag(''span'', __contents, __attribs) +def HR(): + return tag(''hr'', ()) +def BR(): + return tag(''br'', ()) +def CODE(contents): + return tag(''code'', contents) +def B(contents): + return tag(''b'', contents) +def TABLE(contents): + return tag(''table'', contents) +def TR(*contents): + return tag(''tr'', contents) +def TH(*contents): + return tag(''th'', contents) +def TD(*contents): + return tag(''td'', contents) +def FORM(*__contents, **__attribs): + return Tag(''form'', __contents, __attribs) +def INPUT(*__contents, **__attribs): + return Tag(''input'', __contents, __attribs) +def LI(*__contents, **__attribs): + return Tag(''li'', __contents, __attribs) + +def make_table(contents, caption=None, replacement=None, introduction=None): + rows = [] + for row in contents: + cols = [] + if caption and not rows: + for col in caption: + cols.append(TH(col)) + rows.append(Tag(''tr'', cols)) + cols = [] + + for col in row: + cols.append(TD(col)) + rows.append(Tag(''tr'', cols)) + if rows: + if introduction: + return compose(introduction, TABLE(rows)) + return TABLE(rows) + else: + return compose() + +def make_pre(lines): + """Creates a pre-formatted text area.""" + r = [] + append = r.append + for l in lines: + append(l) + append(''\n'') + return tag(''pre'', ''''.join(l)) + +def make_menu(convert, *entries): + """Creates an unnumbered list of hyperlinks. + Each entry can be: + + - a pair (URL, LABEL). + convert(URL) is used as the link, and LABEL as the link text. + - some non-tuple value. + This is added as an individual item. + """ + ul = [] + append = ul.append + for e in entries: + if type(e) == types.TupleType: + (relurl, label) = e + append(LI(A(convert(relurl), label))) + else: + append(LI(e)) + return tag(''ul'', ul) + +def make_list(lst, separator=", "): + """Creates a list of HTML elements.""" + assert type(lst) <> types.StringType + c = [] + if lst: + append = c.append + for e in lst[:-1]: + append(e) + append(separator) + append(lst[-1]) + return Compose(c) + +class InvalidPath(Exception): + """An unknown path was submitted to PathRouter.get""" + +class PathRouter: + """This class maps paths to registered values.""" + + def __init__(self): + self.__map = {} + + def register(self, path, value): + """Registers the indicated value for the path. + + Path may end with ''*'' or ''**'', indicating single-level + wildcards or multi-level wildcards.""" + + m = self.__map + p = path.split(''/'') + if p and not p[0]: + del p[0] + for x in range(len(p)): + element = p[x] + if element: + if m.has_key(element): + m = m[element] + else: + if element == ''*'': + if x + 1 <> len(p): + raise ValueError, ''wildcard * in the middle of path'' + m[''*''] = value + return + if element == ''**'': + if x + 1 <> len(p): + raise ValueError, \ + ''wildcard ** in the middle of path'' + m[''**''] = value + return + + m_new = {} + m[element] = m_new + m = m_new + else: + raise ValueError, "path contains empty element" + m[''''] = value + + def get(self, path): + """Returns a tuple (VALUE, REMAINING-PATH), for the + most-specific path matching the given path.""" + + m = self.__map + p = path.split(''/'') + while p and not p[-1]: + del p[-1] + l = len(p) + for x in range(l): + element = p[x] + + # Ignore empty path elements (leadings slash, duplicated + # slashes). + if element: + try: + m = m[element] + except KeyError: + if x + 1 == l and m.has_key(''*''): + # Use ''*'' only if the remaining path is empty. + return (m[''*''], tuple(p[x:])) + if m.has_key(''**''): + return (m[''**''], tuple(p[x:])) + raise InvalidPath + try: + result = m[''''] + except KeyError: + if m.has_key(''*''): + result = m[''*''] + elif m.has_key(''**''): + result = m[''**''] + else: + raise InvalidPath + return (result, ()) + +class Result: + """Base class for result objects.""" + + def __init__(self): + self.status = 500 + + def flatten(self, write): + pass + +class RedirectResult(Result): + """Permanently redirects the browser to a new URL.""" + def __init__(self, url): + self.status = 301 + self.url = str(url) + + def flatten(self, write): + write("Status: %d\nLocation: %s\n\n" % (self.status, self.url)) + +class HTMLResult(Result): + """An object of this class combines a status code with HTML contents.""" + def __init__(self, contents, status=200, doctype=''''): + self.contents = contents + self.status = status + self.doctype = doctype + + def flatten(self, write): + """Invokes write for the response header and all HTML data. + Includes the doctype declaration.""" + + if self.status <> 200: + write("Status: %d\n" % self.status) + write("Content-Type: text/html\n\n%s\n" % self.doctype) + self.contents.flatten(write) + +class WebService(Service): + def __init__(self, socket_name): + Service.__init__(self, socket_name) + self.__router = PathRouter() + + def register(self, path, method): + """Requests that method is invoked if path is encountered. + + The path has the syntax required by PathRouter.register. The + method should be a function taking several arguments + + - the remaining path + - a dictionary for the request parameters + - a URLFactory object + + The method is expected to return a HTMLResult object. + """ + self.__router.register(path, method) + + def __writeError(self, result, code, msg): + result.write(''Status: %d\nContent-Type: text/plain\n\n%s\n'' + % (code, msg)) + + def html_dtd(self): + """Returns the DOCTYPE declaration to be used for HTML documents. + Can be overridden.""" + return ''<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">'' + + def add_title(self, title, body, head_contents=None, body_attribs={}): + """Takes a sequence of HTML objects and wraps them in ''body'' + and ''html'' tags. Puts title in front of it, and optionally + includes the head_contents material. The attributes of the + body element are taken from the body_attribs dictionary.""" + t = TITLE(title) + if head_contents is None: + head_list = [t] + else: + if isinstance(head_contents, HTMLBase): + head_list = [head_contents] + else: + head_list = list(head_contents) + head_list.append(t) + if isinstance(body, HTMLBase): + body_list = [body] + else: + body_list = list(body) + body_list[:0] = (H1(title),) + + return tag(''html'', + (HEAD(head_list), Tag(''body'', body_list, body_attribs))) + + def pre_dispatch(self, url): + """Invoked by handle prior to calling the registered handler.""" + pass + + def handle(self, args, environment, data, result): + params = cgi.parse(data, environment) + path = environment.get(''PATH_INFO'', '''') + server_name = environment.get(''SERVER_NAME'', '''') + script_name = environment.get(''SCRIPT_NAME'', '''') + + try: + (method, remaining) = self.__router.get(path) + except InvalidPath: + self.__writeError(result, 404, "page not found") + return + self.pre_dispatch() + url = URLFactory(server_name, script_name) + r = method(remaining, params, url) + assert isinstance(r, Result), `r` + r.flatten(result.write) + +def __test(): + assert str(URL("")) == "" + assert str(URL("abc")) == "abc" + assert str(URL(" ")) == " " + assert str(URL("&")) == "&" + + u = URLFactory(server_name=None, script_name=None) + assert str(u.absolute("http://www.enyo.de/")) == "http://www.enyo.de/" + assert str(u.absolute("http://www.enyo.de/", t=''123'')) \ + == "http://www.enyo.de/?t=123" + assert str(u.scriptRelative("/a/b", t=''123'')) == "/a/b?t=123" + assert str(u.scriptRelativeFull("/a/b", t=''123'')) \ + == "http://localhost/a/b?t=123" + + u = URLFactory(server_name=''localhost.localdomain'', + script_name=''/cgi-bin/test.cgi'') + assert str(u.scriptRelative("a/b", t=''123'')) \ + == "/cgi-bin/test.cgi/a/b?t=123" + assert str(u.scriptRelativeFull("a/b", t=''123='')) \ + == "http://localhost.localdomain/cgi-bin/test.cgi/a/b?t=123%3D" + + assert P("").toString() == ''<p></p>'' + assert P(" ").toString() == ''<p> </p>'' + assert P("&").toString() == ''<p>&</p>'' + assert P("\"").toString() == ''<p>"</p>'' + assert P("<").toString() == ''<p><</p>'' + assert P(">").toString() == ''<p>></p>'' + assert P(">").toHTML().toString() == ''<p>></p>'' + assert FORM(method=''get'').toString() == ''<form method="get"/>'' + assert SPAN("green", _class="red").toString() \ + == ''<span class="red">green</span>'' + assert TD(A("http://www.example.net/", "example")).toString() \ + == ''<td><a href="http://www.example.net/">example</a></td>'' + + s = cStringIO.StringIO() + RedirectResult(u.scriptRelativeFull("123")).flatten(s.write) + assert s.getvalue() == ''''''Status: 301 +Location: http://localhost.localdomain/cgi-bin/test.cgi/123 + +'''''' + + assert make_menu(u.scriptRelative, + ("123", "A"), + ("456", "B")).toString() == \ + ''<ul><li><a href="/cgi-bin/test.cgi/123">A</a></li><li><a href="/cgi-bin/test.cgi/456">B</a></li></ul>'' + + pr = PathRouter() + pr.register('''', "root") + pr.register(''/*'', "default") + pr.register(''/abc'', "/abc") + pr.register(''/a/bc'', "/a/bc") + pr.register(''ab/c'', "/ab/c") + pr.register(''/a'', "/a") + pr.register(''/a/**'', "/a/**") + pr.register(''/a/*'', "/a/*") + + assert pr.get("") == ("root", ()) + assert pr.get("/") == ("root", ()) + assert pr.get("//") == ("root", ()) + assert pr.get("/xyz") == ("default", ("xyz",)) + assert pr.get("/a//xyz/") == ("/a/*", ("xyz",)) + assert pr.get("/a//xyz/123") == ("/a/**", ("xyz", "123")) + assert pr.get("/abc") == ("/abc", ()) + +if __name__ == "__main__": + __test()