diff options
author | Guido Günther <agx@sigxcpu.org> | 2011-11-22 16:49:30 +0100 |
---|---|---|
committer | Guido Günther <agx@sigxcpu.org> | 2011-11-22 16:54:09 +0100 |
commit | 29b8b036102f3bb0cb4fa19d66bf8fae274d5537 (patch) | |
tree | 6ec14e533f4e20d3450821da4c5583a71265931c /gbp/deb | |
parent | 3e8b3a23874ddbd9fbf6590be39437f9b846d87a (diff) |
Move debian related helpers into submodule
Diffstat (limited to 'gbp/deb')
-rw-r--r-- | gbp/deb/__init__.py | 618 |
1 files changed, 618 insertions, 0 deletions
diff --git a/gbp/deb/__init__.py b/gbp/deb/__init__.py new file mode 100644 index 0000000..881d1ae --- /dev/null +++ b/gbp/deb/__init__.py @@ -0,0 +1,618 @@ +# vim: set fileencoding=utf-8 : +# +# (C) 2006,2007,2011 Guido Günther <agx@sigxcpu.org> +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +"""provides some debian source package related helpers""" + +import commands +import email +import os +import re +import subprocess +import sys +import glob + +import gbp.command_wrappers as gbpc +from gbp.errors import GbpError +from gbp.git import GitRepositoryError + +# When trying to parse a version-number from a dsc or changes file, these are +# the valid characters. +debian_version_chars = 'a-zA-Z\d.~+-' + +# Valid package names according to Debian Policy Manual 5.6.1: +# "Package names (both source and binary, see Package, Section 5.6.7) +# must consist only of lower case letters (a-z), digits (0-9), plus (+) +# and minus (-) signs, and periods (.). They must be at least two +# characters long and must start with an alphanumeric character." +packagename_re = re.compile("^[a-z0-9][a-z0-9\.\+\-]+$") +packagename_msg = """Package names must be at least two characters long, start with an +alphanumeric and can only containg lower case letters (a-z), digits +(0-9), plus signs (+), minus signs (-), and periods (.)""" + +# Valid upstream versions according to Debian Policy Manual 5.6.12: +# "The upstream_version may contain only alphanumerics[32] and the +# characters . + - : ~ (full stop, plus, hyphen, colon, tilde) and +# should start with a digit. If there is no debian_revision then hyphens +# are not allowed; if there is no epoch then colons are not allowed." +# Since we don't know about any epochs and debian revisions yet, the +# last two conditions are not checked. +upstreamversion_re = re.compile("^[0-9][a-z0-9\.\+\-\:\~]*$") +upstreamversion_msg = """Upstream version numbers must start with a digit and can only containg lower case +letters (a-z), digits (0-9), full stops (.), plus signs (+), minus signs +(-), colons (:) and tildes (~)""" + +# compression types, extra options and extensions +compressor_opts = { 'gzip' : [ '-n', 'gz' ], + 'bzip2' : [ '', 'bz2' ], + 'lzma' : [ '', 'lzma' ], + 'xz' : [ '', 'xz' ] } + +# Map frequently used names of compression types to the internal ones: +compressor_aliases = { 'bz2' : 'bzip2', + 'gz' : 'gzip', } + +class NoChangelogError(Exception): + """no changelog found""" + pass + +class ParseChangeLogError(Exception): + """problem parsing changelog""" + pass + + +class DpkgCompareVersions(gbpc.Command): + cmd='/usr/bin/dpkg' + + def __init__(self): + if not os.access(self.cmd, os.X_OK): + raise GbpError, "%s not found - cannot use compare versions" % self.cmd + gbpc.Command.__init__(self, self.cmd, ['--compare-versions']) + + def __call__(self, version1, version2): + self.run_error = "Couldn't compare %s with %s" % (version1, version2) + res = gbpc.Command.call(self, [ version1, 'lt', version2 ]) + if res not in [ 0, 1 ]: + raise gbpc.CommandExecFailed, "%s: bad return code %d" % (self.run_error, res) + if res == 0: + return -1 + elif res == 1: + res = gbpc.Command.call(self, [ version1, 'gt', version2 ]) + if res not in [ 0, 1 ]: + raise gbpc.CommandExecFailed, "%s: bad return code %d" % (self.run_error, res) + if res == 0: + return 1 + return 0 + + +class UpstreamSource(object): + """ + Upstream source. Can be either an unpacked dir, a tarball or another type + of archive + + @cvar is_dir: are the upstream sources an unpacked dir + @type is_dir: boolean + @cvar _orig: are the upstream sources already suitable as an upstream + tarball + @type _orig: boolean + @cvar _path: path to the upstream sources + @type _path: string + @cvar _unpacked: path to the unpacked source tree + @type _unpacked: string + """ + def __init__(self, name, unpacked=None): + self.is_dir = False + self._orig = False + self._path = name + self.unpacked = unpacked + + self.is_dir = [False, True][os.path.isdir(name)] + self._check_orig() + if self.is_dir: + self.unpacked = self.path + + def _check_orig(self): + """Check if archive can be used as orig tarball""" + if self.is_dir: + self._orig = False + return + + parts = self._path.split('.') + try: + if parts[-2] == 'tar': + if (parts[-1] in compressor_opts or + parts[-1] in compressor_aliases): + self._orig = True + except IndexError: + self._orig = False + + @property + def is_orig(self): + return self._orig + + @property + def path(self): + return self._path.rstrip('/') + + def unpack(self, dir, filters=[]): + """ + Unpack packed upstream sources into a given directory + and determine the toplevel of the source tree. + """ + if self.is_dir: + raise GbpError, "Cannot unpack directory %s" % self.path + + if not filters: + filters = [] + + if type(filters) != type([]): + raise GbpError, "Filters must be a list" + + self._unpack_archive(dir, filters) + self.unpacked = self._unpacked_toplevel(dir) + + def _unpack_archive(self, dir, filters): + """ + Unpack packed upstream sources into a given directory. + """ + ext = os.path.splitext(self.path)[1] + if ext in [ ".zip", ".xpi" ]: + self._unpack_zip(dir) + else: + self._unpack_tar(dir, filters) + + def _unpack_zip(self, dir): + try: + gbpc.UnpackZipArchive(self.path, dir)() + except gbpc.CommandExecFailed: + raise GbpError, "Unpacking of %s failed" % self.path + + def _unpacked_toplevel(self, dir): + """unpacked archives can contain a leading directory or not""" + unpacked = glob.glob('%s/*' % dir) + unpacked.extend(glob.glob("%s/.*" % dir)) # include hidden files and folders + # Check that dir contains nothing but a single folder: + if len(unpacked) == 1 and os.path.isdir(unpacked[0]): + return unpacked[0] + else: + return dir + + def _unpack_tar(self, dir, filters): + """ + unpack a .orig.tar.gz to tmpdir, leave the cleanup to the caller in case of + an error + """ + try: + unpackArchive = gbpc.UnpackTarArchive(self.path, dir, filters) + unpackArchive() + except gbpc.CommandExecFailed: + # unpackArchive already printed an error message + raise GbpError + + def pack(self, newarchive, filters=[]): + """ + recreate a new archive from the current one + + @param newarchive: the name of the new archive + @type newarchive: string + @param filters: tar filters to apply + @type filters: array of strings + @return: the new upstream source + @rtype: UpstreamSource + """ + if not self.unpacked: + raise GbpError, "Need an unpacked source tree to pack" + + if not filters: + filters = [] + + if type(filters) != type([]): + raise GbpError, "Filters must be a list" + + try: + unpacked = self.unpacked.rstrip('/') + repackArchive = gbpc.PackTarArchive(newarchive, + os.path.dirname(unpacked), + os.path.basename(unpacked), + filters) + repackArchive() + except gbpc.CommandExecFailed: + # repackArchive already printed an error + raise GbpError + return UpstreamSource(newarchive) + + @staticmethod + def known_compressions(): + return [ args[1][-1] for args in compressor_opts.items() ] + + def guess_version(self, extra_regex=r''): + """ + Guess the package name and version from the filename of an upstream + archive. + + >>> UpstreamSource('foo-bar_0.2.orig.tar.gz').guess_version() + ('foo-bar', '0.2') + >>> UpstreamSource('foo-Bar_0.2.orig.tar.gz').guess_version() + >>> UpstreamSource('git-bar-0.2.tar.gz').guess_version() + ('git-bar', '0.2') + >>> UpstreamSource('git-bar-0.2-rc1.tar.gz').guess_version() + ('git-bar', '0.2-rc1') + >>> UpstreamSource('git-bar-0.2:~-rc1.tar.gz').guess_version() + ('git-bar', '0.2:~-rc1') + >>> UpstreamSource('git-Bar-0A2d:rc1.tar.bz2').guess_version() + ('git-Bar', '0A2d:rc1') + >>> UpstreamSource('git-1.tar.bz2').guess_version() + ('git', '1') + >>> UpstreamSource('kvm_87+dfsg.orig.tar.gz').guess_version() + ('kvm', '87+dfsg') + >>> UpstreamSource('foo-Bar_0.2.orig.tar.gz').guess_version() + >>> UpstreamSource('foo-Bar-a.b.tar.gz').guess_version() + >>> UpstreamSource('foo-bar_0.2.orig.tar.xz').guess_version() + ('foo-bar', '0.2') + >>> UpstreamSource('foo-bar_0.2.orig.tar.lzma').guess_version() + ('foo-bar', '0.2') + + @param extra_regex: additional regex to apply, needs a 'package' and a + 'version' group + @return: (package name, version) or None. + @rtype: tuple + """ + version_chars = r'[a-zA-Z\d\.\~\-\:\+]' + extensions = r'\.tar\.(%s)' % "|".join(self.known_compressions()) + + version_filters = map ( lambda x: x % (version_chars, extensions), + ( # Debian package_<version>.orig.tar.gz: + r'^(?P<package>[a-z\d\.\+\-]+)_(?P<version>%s+)\.orig%s', + # Upstream package-<version>.tar.gz: + r'^(?P<package>[a-zA-Z\d\.\+\-]+)-(?P<version>[0-9]%s*)%s')) + if extra_regex: + version_filters = extra_regex + version_filters + + for filter in version_filters: + m = re.match(filter, os.path.basename(self.path)) + if m: + return (m.group('package'), m.group('version')) + + +class DscFile(object): + """Keeps all needed data read from a dscfile""" + compressions = r"(%s)" % '|'.join(UpstreamSource.known_compressions()) + pkg_re = re.compile(r'Source:\s+(?P<pkg>.+)\s*') + version_re = re.compile(r'Version:\s((?P<epoch>\d+)\:)?(?P<version>[%s]+)\s*$' % debian_version_chars) + tar_re = re.compile(r'^\s\w+\s\d+\s+(?P<tar>[^_]+_[^_]+(\.orig)?\.tar\.%s)' % compressions) + diff_re = re.compile(r'^\s\w+\s\d+\s+(?P<diff>[^_]+_[^_]+\.diff.(gz|bz2))') + deb_tgz_re = re.compile(r'^\s\w+\s\d+\s+(?P<deb_tgz>[^_]+_[^_]+\.debian.tar.%s)' % compressions) + format_re = re.compile(r'Format:\s+(?P<format>[0-9.]+)\s*') + + def __init__(self, dscfile): + self.pkg = "" + self.tgz = "" + self.diff = "" + self.deb_tgz = "" + self.pkgformat = "1.0" + self.debian_version = "" + self.upstream_version = "" + self.native = False + self.dscfile = os.path.abspath(dscfile) + + f = file(self.dscfile) + fromdir = os.path.dirname(os.path.abspath(dscfile)) + for line in f: + m = self.version_re.match(line) + if m and not self.upstream_version: + if '-' in m.group('version'): + self.debian_version = m.group('version').split("-")[-1] + self.upstream_version = "-".join(m.group('version').split("-")[0:-1]) + self.native = False + else: + self.native = True # Debian native package + self.upstream_version = m.group('version') + if m.group('epoch'): + self.epoch = m.group('epoch') + else: + self.epoch = "" + continue + m = self.pkg_re.match(line) + if m: + self.pkg = m.group('pkg') + continue + m = self.deb_tgz_re.match(line) + if m: + self.deb_tgz = os.path.join(fromdir, m.group('deb_tgz')) + continue + m = self.tar_re.match(line) + if m: + self.tgz = os.path.join(fromdir, m.group('tar')) + continue + m = self.diff_re.match(line) + if m: + self.diff = os.path.join(fromdir, m.group('diff')) + continue + m = self.format_re.match(line) + if m: + self.pkgformat = m.group('format') + continue + f.close() + + if not self.pkg: + raise GbpError, "Cannot parse package name from '%s'" % self.dscfile + elif not self.tgz: + raise GbpError, "Cannot parse archive name from '%s'" % self.dscfile + if not self.upstream_version: + raise GbpError, "Cannot parse version number from '%s'" % self.dscfile + if not self.native and not self.debian_version: + raise GbpError, "Cannot parse Debian version number from '%s'" % self.dscfile + + def _get_version(self): + version = [ "", self.epoch + ":" ][len(self.epoch) > 0] + if self.native: + version += self.upstream_version + else: + version += "%s-%s" % (self.upstream_version, self.debian_version) + return version + + version = property(_get_version) + + def __str__(self): + return "<%s object %s>" % (self.__class__.__name__, self.dscfile) + + +def parse_dsc(dscfile): + """parse dsc by creating a DscFile object""" + try: + dsc = DscFile(dscfile) + except IOError, err: + raise GbpError, "Error reading dsc file: %s" % err + + return dsc + +def parse_changelog_repo(repo, branch, filename): + """ + Parse the changelog file from given branch in the git + repository. + """ + try: + # Note that we could just pass in the branch:filename notation + # to show as well, but we want to check if the branch / filename + # exists first, so we can give a separate error from other + # repository errors. + sha = repo.rev_parse("%s:%s" % (branch, filename)) + except GitRepositoryError: + raise NoChangelogError, "Changelog %s not found in branch %s" % (filename, branch) + + lines = repo.show(sha) + return parse_changelog('\n'.join(lines)) + +def parse_changelog(contents=None, filename=None): + """ + Parse the content of a changelog file. Either contents, containing + the contents of a changelog file, or filename, pointing to a + changelog file must be passed. + + Returns: + + cp['Version']: full version string including epoch + cp['Upstream-Version']: upstream version, if not debian native + cp['Debian-Version']: debian release + cp['Epoch']: epoch, if any + cp['NoEpoch-Version']: full version string excluding epoch + """ + # Check that either contents or filename is passed (but not both) + if (not filename and not contents) or (filename and contents): + raise Exception("Either filename or contents must be passed to parse_changelog") + + # If a filename was passed, check if it exists + if filename and not os.access(filename, os.F_OK): + raise NoChangelogError, "Changelog %s not found" % (filename, ) + + # If no filename was passed, let parse_changelog read from stdin + if not filename: + filename = '-' + + # Note that if contents is None, stdin will just be closed right + # away by communicate. + cmd = subprocess.Popen(['dpkg-parsechangelog', '-l%s' % filename], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + (output, errors) = cmd.communicate(contents) + if cmd.returncode: + raise ParseChangeLogError, "Failed to parse changelog. dpkg-parsechangelog said:\n%s" % (errors, ) + # Parse the result of dpkg-parsechangelog (which looks like + # email headers) + cp = email.message_from_string(output) + try: + if ':' in cp['Version']: + cp['Epoch'], cp['NoEpoch-Version'] = cp['Version'].split(':', 1) + else: + cp['NoEpoch-Version'] = cp['Version'] + if '-' in cp['NoEpoch-Version']: + cp['Upstream-Version'], cp['Debian-Version'] = cp['NoEpoch-Version'].rsplit('-', 1) + else: + cp['Debian-Version'] = cp['NoEpoch-Version'] + except TypeError: + raise ParseChangeLogError, output.split('\n')[0] + return cp + + +def orig_file(cp, compression): + """ + The name of the orig file belonging to changelog cp + + >>> orig_file({'Source': 'foo', 'Upstream-Version': '1.0'}, "bzip2") + 'foo_1.0.orig.tar.bz2' + >>> orig_file({'Source': 'bar', 'Upstream-Version': '0.0~git1234'}, "xz") + 'bar_0.0~git1234.orig.tar.xz' + """ + ext = compressor_opts[compression][1] + return "%s_%s.orig.tar.%s" % (cp['Source'], cp['Upstream-Version'], ext) + + +def is_native(cp): + """ + Is this a debian native package + + >>> is_native(dict(Version="1")) + True + >>> is_native(dict(Version="1-1")) + False + """ + return not '-' in cp['Version'] + +def is_valid_packagename(name): + "Is this a valid Debian package name?" + return packagename_re.match(name) + +def is_valid_upstreamversion(version): + "Is this a valid upstream version number?" + return upstreamversion_re.match(version) + +def get_compression(orig_file): + """ + Given an orig file return the compression used + + >>> get_compression("abc.tar.gz") + 'gzip' + >>> get_compression("abc.tar.bz2") + 'bzip2' + >>> get_compression("abc.tar.foo") + >>> get_compression("abc") + """ + try: + ext = orig_file.rsplit('.',1)[1] + except IndexError: + return None + for (c, o) in compressor_opts.iteritems(): + if o[1] == ext: + return c + return None + + +def has_epoch(cp): + """ + Does the topmost version number in the changelog contain an epoch + >>> has_epoch(dict(Epoch="1")) + True + >>> has_epoch(dict()) + False + """ + return cp.has_key("Epoch") + + +def has_orig(cp, compression, dir): + "Check if orig.tar.gz exists in dir" + try: + os.stat( os.path.join(dir, orig_file(cp, compression)) ) + except OSError: + return False + return True + +def symlink_orig(cp, compression, orig_dir, output_dir, force=False): + """ + symlink orig.tar.gz from orig_dir to output_dir + @return: True if link was created or src == dst + False in case of error or src doesn't exist + """ + orig_dir = os.path.abspath(orig_dir) + output_dir = os.path.abspath(output_dir) + + if orig_dir == output_dir: + return True + + src = os.path.join(orig_dir, orig_file(cp, compression)) + dst = os.path.join(output_dir, orig_file(cp, compression)) + if not os.access(src, os.F_OK): + return False + try: + if os.access(dst, os.F_OK) and force: + os.unlink(dst) + os.symlink(src, dst) + except OSError: + return False + return True + + +def parse_uscan(out): + """ + Parse the uscan output return (True, tarball) if a new version was + downloaded and could be located. If the tarball can't be located it returns + (True, None). Returns (False, None) if the current version is up to date. + + >>> parse_uscan("<status>up to date</status>") + (False, None) + >>> parse_uscan("<target>virt-viewer_0.4.0.orig.tar.gz</target>") + (True, '../virt-viewer_0.4.0.orig.tar.gz') + + @param out: uscan output + @type out: string + @return: status and tarball name + @rtype: tuple + """ + source = None + if "<status>up to date</status>" in out: + return (False, None) + else: + # Check if uscan downloaded something + for row in out.split("\n"): + # uscan >= 2.10.70 has a target element: + m = re.match(r"<target>(.*)</target>", row) + if m: + source = '../%s' % m.group(1) + break + elif row.startswith('<messages>'): + m = re.match(r".*symlinked ([^\s]+) to it", row) + if m: + source = "../%s" % m.group(1) + break + m = re.match(r"Successfully downloaded updated package ([^<]+)", row) + if m: + source = "../%s" % m.group(1) + break + # try to determine the already downloaded sources name + else: + d = {} + for row in out.split("\n"): + for n in ('package', 'upstream-version', 'upstream-url'): + m = re.match("<%s>(.*)</%s>" % (n,n), row) + if m: + d[n] = m.group(1) + d["ext"] = os.path.splitext(d['upstream-url'])[1] + # We want the name of the orig tarball if possible + source = "../%(package)s_%(upstream-version)s.orig.tar%(ext)s" % d + if not os.path.exists(source): + # Fall back to the sources name otherwise + source = "../%s" % d['upstream-url'].rsplit('/',1)[1] + print source + if not os.path.exists(source): + source = None + return (True, source) + + +def do_uscan(): + """invoke uscan to fetch a new upstream version""" + p = subprocess.Popen(['uscan', '--symlink', '--destdir=..', '--dehs'], stdout=subprocess.PIPE) + out = p.communicate()[0] + return parse_uscan(out) + + +def get_arch(): + pipe = subprocess.Popen(["dpkg", "--print-architecture"], shell=False, stdout=subprocess.PIPE) + arch = pipe.stdout.readline().strip() + return arch + + +def compare_versions(version1, version2): + """compares to Debian versionnumbers suitable for sort()""" + return DpkgCompareVersions()(version1, version2) + + +# vim:et:ts=4:sw=4:et:sts=4:ai:set list listchars=tab\:»·,trail\:·: |