From 29b8b036102f3bb0cb4fa19d66bf8fae274d5537 Mon Sep 17 00:00:00 2001 From: Guido Günther Date: Tue, 22 Nov 2011 16:49:30 +0100 Subject: Move debian related helpers into submodule --- debian/rules | 3 +- gbp/deb.py | 617 -------------------------------------- gbp/deb/__init__.py | 618 +++++++++++++++++++++++++++++++++++++++ setup.py | 4 +- tests/06_test_upstream_source.py | 4 +- 5 files changed, 624 insertions(+), 622 deletions(-) delete mode 100644 gbp/deb.py create mode 100644 gbp/deb/__init__.py diff --git a/debian/rules b/debian/rules index bc55232..d416fa4 100755 --- a/debian/rules +++ b/debian/rules @@ -23,7 +23,8 @@ override_dh_auto_test: PYTHONPATH=. \ python setup.py nosetests - PYTHONPATH=. pychecker $(PYCHECKER_ARGS) -q gbp gbp.scripts gbp.git + PYTHONPATH=. pychecker $(PYCHECKER_ARGS) -q \ + gbp gbp.scripts gbp.git gbp.deb override_dh_auto_build: dh_auto_build diff --git a/gbp/deb.py b/gbp/deb.py deleted file mode 100644 index e7d1b0e..0000000 --- a/gbp/deb.py +++ /dev/null @@ -1,617 +0,0 @@ -# vim: set fileencoding=utf-8 : -# -# (C) 2006,2007,2011 Guido Günther -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -"""provides some debian source package related helpers""" - -import commands -import email -import os -import re -import subprocess -import sys -import glob -import command_wrappers as gbpc -from errors import GbpError -from gbp.git import GitRepositoryError - -# When trying to parse a version-number from a dsc or changes file, these are -# the valid characters. -debian_version_chars = 'a-zA-Z\d.~+-' - -# Valid package names according to Debian Policy Manual 5.6.1: -# "Package names (both source and binary, see Package, Section 5.6.7) -# must consist only of lower case letters (a-z), digits (0-9), plus (+) -# and minus (-) signs, and periods (.). They must be at least two -# characters long and must start with an alphanumeric character." -packagename_re = re.compile("^[a-z0-9][a-z0-9\.\+\-]+$") -packagename_msg = """Package names must be at least two characters long, start with an -alphanumeric and can only containg lower case letters (a-z), digits -(0-9), plus signs (+), minus signs (-), and periods (.)""" - -# Valid upstream versions according to Debian Policy Manual 5.6.12: -# "The upstream_version may contain only alphanumerics[32] and the -# characters . + - : ~ (full stop, plus, hyphen, colon, tilde) and -# should start with a digit. If there is no debian_revision then hyphens -# are not allowed; if there is no epoch then colons are not allowed." -# Since we don't know about any epochs and debian revisions yet, the -# last two conditions are not checked. -upstreamversion_re = re.compile("^[0-9][a-z0-9\.\+\-\:\~]*$") -upstreamversion_msg = """Upstream version numbers must start with a digit and can only containg lower case -letters (a-z), digits (0-9), full stops (.), plus signs (+), minus signs -(-), colons (:) and tildes (~)""" - -# compression types, extra options and extensions -compressor_opts = { 'gzip' : [ '-n', 'gz' ], - 'bzip2' : [ '', 'bz2' ], - 'lzma' : [ '', 'lzma' ], - 'xz' : [ '', 'xz' ] } - -# Map frequently used names of compression types to the internal ones: -compressor_aliases = { 'bz2' : 'bzip2', - 'gz' : 'gzip', } - -class NoChangelogError(Exception): - """no changelog found""" - pass - -class ParseChangeLogError(Exception): - """problem parsing changelog""" - pass - - -class DpkgCompareVersions(gbpc.Command): - cmd='/usr/bin/dpkg' - - def __init__(self): - if not os.access(self.cmd, os.X_OK): - raise GbpError, "%s not found - cannot use compare versions" % self.cmd - gbpc.Command.__init__(self, self.cmd, ['--compare-versions']) - - def __call__(self, version1, version2): - self.run_error = "Couldn't compare %s with %s" % (version1, version2) - res = gbpc.Command.call(self, [ version1, 'lt', version2 ]) - if res not in [ 0, 1 ]: - raise gbpc.CommandExecFailed, "%s: bad return code %d" % (self.run_error, res) - if res == 0: - return -1 - elif res == 1: - res = gbpc.Command.call(self, [ version1, 'gt', version2 ]) - if res not in [ 0, 1 ]: - raise gbpc.CommandExecFailed, "%s: bad return code %d" % (self.run_error, res) - if res == 0: - return 1 - return 0 - - -class UpstreamSource(object): - """ - Upstream source. Can be either an unpacked dir, a tarball or another type - or archive - - @cvar is_dir: are the upstream sources an unpacked dir - @type is_dir: boolean - @cvar _orig: are the upstream sources already suitable as an upstream - tarball - @type _irog: boolen - @cvar _path: path to the upstream sources - @type _path: string - @cvar _unpacked: path to the unpacked source tree - @type _unpacked: string - """ - def __init__(self, name, unpacked=None): - self.is_dir = False - self._orig = False - self._path = name - self.unpacked = unpacked - - self.is_dir = [False, True][os.path.isdir(name)] - self._check_orig() - if self.is_dir: - self.unpacked = self.path - - def _check_orig(self): - """Check if archive can be used as orig tarball""" - if self.is_dir: - self._orig = False - return - - parts = self._path.split('.') - try: - if parts[-2] == 'tar': - if (parts[-1] in compressor_opts or - parts[-1] in compressor_aliases): - self._orig = True - except IndexError: - self._orig = False - - @property - def is_orig(self): - return self._orig - - @property - def path(self): - return self._path.rstrip('/') - - def unpack(self, dir, filters=[]): - """ - Unpack packed upstream sources into a given directory - and determine the toplevel of the source tree. - """ - if self.is_dir: - raise GbpError, "Cannot unpack directory %s" % self.path - - if not filters: - filters = [] - - if type(filters) != type([]): - raise GbpError, "Filters must be a list" - - self._unpack_archive(dir, filters) - self.unpacked = self._unpacked_toplevel(dir) - - def _unpack_archive(self, dir, filters): - """ - Unpack packed upstream sources into a given directory. - """ - ext = os.path.splitext(self.path)[1] - if ext in [ ".zip", ".xpi" ]: - self._unpack_zip(dir) - else: - self._unpack_tar(dir, filters) - - def _unpack_zip(self, dir): - try: - gbpc.UnpackZipArchive(self.path, dir)() - except gbpc.CommandExecFailed: - raise GbpError, "Unpacking of %s failed" % self.path - - def _unpacked_toplevel(self, dir): - """unpacked archives can contain a leading directory not""" - unpacked = glob.glob('%s/*' % dir) - unpacked.extend(glob.glob("%s/.*" % dir)) # include hidden files and folders - # Check that dir contains nothing but a single folder: - if len(unpacked) == 1 and os.path.isdir(unpacked[0]): - return unpacked[0] - else: - return dir - - def _unpack_tar(self, dir, filters): - """ - unpack a .orig.tar.gz to tmpdir, leave the cleanup to the caller in case of - an error - """ - try: - unpackArchive = gbpc.UnpackTarArchive(self.path, dir, filters) - unpackArchive() - except gbpc.CommandExecFailed: - # unpackArchive already printed an error message - raise GbpError - - def pack(self, newarchive, filters=[]): - """ - recreate a new archive from the current one - - @param newarchive: the name of the new archive - @type newarchive: string - @param filters: tar filters to apply - @type filters: array of strings - @return: the new upstream source - @rtype: UpstreamSource - """ - if not self.unpacked: - raise GbpError, "Need an unpacked source tree to pack" - - if not filters: - filters = [] - - if type(filters) != type([]): - raise GbpError, "Filters must be a list" - - try: - unpacked = self.unpacked.rstrip('/') - repackArchive = gbpc.PackTarArchive(newarchive, - os.path.dirname(unpacked), - os.path.basename(unpacked), - filters) - repackArchive() - except gbpc.CommandExecFailed: - # repackArchive already printed an error - raise GbpError - return UpstreamSource(newarchive) - - @staticmethod - def known_compressions(): - return [ args[1][-1] for args in compressor_opts.items() ] - - def guess_version(self, extra_regex=r''): - """ - Guess the package name and version from the filename of an upstream - archive. - - @param extra_regex: additional regex to apply, needs a 'package' and a - 'version' group - @return: (package name, version) or None. - @rtype: tuple - - >>> UpstreamSource('foo-bar_0.2.orig.tar.gz').guess_version() - ('foo-bar', '0.2') - >>> UpstreamSource('foo-Bar_0.2.orig.tar.gz').guess_version() - >>> UpstreamSource('git-bar-0.2.tar.gz').guess_version() - ('git-bar', '0.2') - >>> UpstreamSource('git-bar-0.2-rc1.tar.gz').guess_version() - ('git-bar', '0.2-rc1') - >>> UpstreamSource('git-bar-0.2:~-rc1.tar.gz').guess_version() - ('git-bar', '0.2:~-rc1') - >>> UpstreamSource('git-Bar-0A2d:rc1.tar.bz2').guess_version() - ('git-Bar', '0A2d:rc1') - >>> UpstreamSource('git-1.tar.bz2').guess_version() - ('git', '1') - >>> UpstreamSource('kvm_87+dfsg.orig.tar.gz').guess_version() - ('kvm', '87+dfsg') - >>> UpstreamSource('foo-Bar_0.2.orig.tar.gz').guess_version() - >>> UpstreamSource('foo-Bar-a.b.tar.gz').guess_version() - >>> UpstreamSource('foo-bar_0.2.orig.tar.xz').guess_version() - ('foo-bar', '0.2') - >>> UpstreamSource('foo-bar_0.2.orig.tar.lzma').guess_version() - ('foo-bar', '0.2') - """ - version_chars = r'[a-zA-Z\d\.\~\-\:\+]' - extensions = r'\.tar\.(%s)' % "|".join(self.known_compressions()) - - version_filters = map ( lambda x: x % (version_chars, extensions), - ( # Debian package_.orig.tar.gz: - r'^(?P[a-z\d\.\+\-]+)_(?P%s+)\.orig%s', - # Upstream package-.tar.gz: - r'^(?P[a-zA-Z\d\.\+\-]+)-(?P[0-9]%s*)%s')) - if extra_regex: - version_filters = extra_regex + version_filters - - for filter in version_filters: - m = re.match(filter, os.path.basename(self.path)) - if m: - return (m.group('package'), m.group('version')) - - -class DscFile(object): - """Keeps all needed data read from a dscfile""" - compressions = r"(%s)" % '|'.join(UpstreamSource.known_compressions()) - pkg_re = re.compile(r'Source:\s+(?P.+)\s*') - version_re = re.compile(r'Version:\s((?P\d+)\:)?(?P[%s]+)\s*$' % debian_version_chars) - tar_re = re.compile(r'^\s\w+\s\d+\s+(?P[^_]+_[^_]+(\.orig)?\.tar\.%s)' % compressions) - diff_re = re.compile(r'^\s\w+\s\d+\s+(?P[^_]+_[^_]+\.diff.(gz|bz2))') - deb_tgz_re = re.compile(r'^\s\w+\s\d+\s+(?P[^_]+_[^_]+\.debian.tar.%s)' % compressions) - format_re = re.compile(r'Format:\s+(?P[0-9.]+)\s*') - - def __init__(self, dscfile): - self.pkg = "" - self.tgz = "" - self.diff = "" - self.deb_tgz = "" - self.pkgformat = "1.0" - self.debian_version = "" - self.upstream_version = "" - self.native = False - self.dscfile = os.path.abspath(dscfile) - - f = file(self.dscfile) - fromdir = os.path.dirname(os.path.abspath(dscfile)) - for line in f: - m = self.version_re.match(line) - if m and not self.upstream_version: - if '-' in m.group('version'): - self.debian_version = m.group('version').split("-")[-1] - self.upstream_version = "-".join(m.group('version').split("-")[0:-1]) - self.native = False - else: - self.native = True # Debian native package - self.upstream_version = m.group('version') - if m.group('epoch'): - self.epoch = m.group('epoch') - else: - self.epoch = "" - continue - m = self.pkg_re.match(line) - if m: - self.pkg = m.group('pkg') - continue - m = self.deb_tgz_re.match(line) - if m: - self.deb_tgz = os.path.join(fromdir, m.group('deb_tgz')) - continue - m = self.tar_re.match(line) - if m: - self.tgz = os.path.join(fromdir, m.group('tar')) - continue - m = self.diff_re.match(line) - if m: - self.diff = os.path.join(fromdir, m.group('diff')) - continue - m = self.format_re.match(line) - if m: - self.pkgformat = m.group('format') - continue - f.close() - - if not self.pkg: - raise GbpError, "Cannot parse package name from '%s'" % self.dscfile - elif not self.tgz: - raise GbpError, "Cannot parse archive name from '%s'" % self.dscfile - if not self.upstream_version: - raise GbpError, "Cannot parse version number from '%s'" % self.dscfile - if not self.native and not self.debian_version: - raise GbpError, "Cannot parse Debian version number from '%s'" % self.dscfile - - def _get_version(self): - version = [ "", self.epoch + ":" ][len(self.epoch) > 0] - if self.native: - version += self.upstream_version - else: - version += "%s-%s" % (self.upstream_version, self.debian_version) - return version - - version = property(_get_version) - - def __str__(self): - return "<%s object %s>" % (self.__class__.__name__, self.dscfile) - - -def parse_dsc(dscfile): - """parse dsc by creating a DscFile object""" - try: - dsc = DscFile(dscfile) - except IOError, err: - raise GbpError, "Error reading dsc file: %s" % err - - return dsc - -def parse_changelog_repo(repo, branch, filename): - """ - Parse the changelog file from given branch in the git - repository. - """ - try: - # Note that we could just pass in the branch:filename notation - # to show as well, but we want to check if the branch / filename - # exists first, so we can give a separate error from other - # repository errors. - sha = repo.rev_parse("%s:%s" % (branch, filename)) - except GitRepositoryError: - raise NoChangelogError, "Changelog %s not found in branch %s" % (filename, branch) - - lines = repo.show(sha) - return parse_changelog('\n'.join(lines)) - -def parse_changelog(contents=None, filename=None): - """ - Parse the content of a changelog file. Either contents, containing - the contents of a changelog file, or filename, pointing to a - changelog file must be passed. - - Returns: - - cp['Version']: full version string including epoch - cp['Upstream-Version']: upstream version, if not debian native - cp['Debian-Version']: debian release - cp['Epoch']: epoch, if any - cp['NoEpoch-Version']: full version string excluding epoch - """ - # Check that either contents or filename is passed (but not both) - if (not filename and not contents) or (filename and contents): - raise Exception("Either filename or contents must be passed to parse_changelog") - - # If a filename was passed, check if it exists - if filename and not os.access(filename, os.F_OK): - raise NoChangelogError, "Changelog %s not found" % (filename, ) - - # If no filename was passed, let parse_changelog read from stdin - if not filename: - filename = '-' - - # Note that if contents is None, stdin will just be closed right - # away by communicate. - cmd = subprocess.Popen(['dpkg-parsechangelog', '-l%s' % filename], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - (output, errors) = cmd.communicate(contents) - if cmd.returncode: - raise ParseChangeLogError, "Failed to parse changelog. dpkg-parsechangelog said:\n%s" % (errors, ) - # Parse the result of dpkg-parsechangelog (which looks like - # email headers) - cp = email.message_from_string(output) - try: - if ':' in cp['Version']: - cp['Epoch'], cp['NoEpoch-Version'] = cp['Version'].split(':', 1) - else: - cp['NoEpoch-Version'] = cp['Version'] - if '-' in cp['NoEpoch-Version']: - cp['Upstream-Version'], cp['Debian-Version'] = cp['NoEpoch-Version'].rsplit('-', 1) - else: - cp['Debian-Version'] = cp['NoEpoch-Version'] - except TypeError: - raise ParseChangeLogError, output.split('\n')[0] - return cp - - -def orig_file(cp, compression): - """ - The name of the orig file belonging to changelog cp - - >>> orig_file({'Source': 'foo', 'Upstream-Version': '1.0'}, "bzip2") - 'foo_1.0.orig.tar.bz2' - >>> orig_file({'Source': 'bar', 'Upstream-Version': '0.0~git1234'}, "xz") - 'bar_0.0~git1234.orig.tar.xz' - """ - ext = compressor_opts[compression][1] - return "%s_%s.orig.tar.%s" % (cp['Source'], cp['Upstream-Version'], ext) - - -def is_native(cp): - """ - Is this a debian native package - - >>> is_native(dict(Version="1")) - True - >>> is_native(dict(Version="1-1")) - False - """ - return not '-' in cp['Version'] - -def is_valid_packagename(name): - "Is this a valid Debian package name?" - return packagename_re.match(name) - -def is_valid_upstreamversion(version): - "Is this a valid upstream version number?" - return upstreamversion_re.match(version) - -def get_compression(orig_file): - """ - Given an orig file return the compression used - - >>> get_compression("abc.tar.gz") - 'gzip' - >>> get_compression("abc.tar.bz2") - 'bzip2' - >>> get_compression("abc.tar.foo") - >>> get_compression("abc") - """ - try: - ext = orig_file.rsplit('.',1)[1] - except IndexError: - return None - for (c, o) in compressor_opts.iteritems(): - if o[1] == ext: - return c - return None - - -def has_epoch(cp): - """ - Does the topmost version number in the changelog contain an epoch - >>> has_epoch(dict(Epoch="1")) - True - >>> has_epoch(dict()) - False - """ - return cp.has_key("Epoch") - - -def has_orig(cp, compression, dir): - "Check if orig.tar.gz exists in dir" - try: - os.stat( os.path.join(dir, orig_file(cp, compression)) ) - except OSError: - return False - return True - -def symlink_orig(cp, compression, orig_dir, output_dir, force=False): - """ - symlink orig.tar.gz from orig_dir to output_dir - @return: True if link was created or src == dst - False in case of error or src doesn't exist - """ - orig_dir = os.path.abspath(orig_dir) - output_dir = os.path.abspath(output_dir) - - if orig_dir == output_dir: - return True - - src = os.path.join(orig_dir, orig_file(cp, compression)) - dst = os.path.join(output_dir, orig_file(cp, compression)) - if not os.access(src, os.F_OK): - return False - try: - if os.access(dst, os.F_OK) and force: - os.unlink(dst) - os.symlink(src, dst) - except OSError: - return False - return True - - -def parse_uscan(out): - """ - Parse the uscan output return (True, tarball) if a new version was - downloaded and could be located. If the tarball can't be located it returns - (True, None). Returns (False, None) if the current version is up to date. - - >>> parse_uscan("up to date") - (False, None) - >>> parse_uscan("virt-viewer_0.4.0.orig.tar.gz") - (True, '../virt-viewer_0.4.0.orig.tar.gz') - - @param out: uscan output - @type out: string - @return: status and tarball name - @rtype: tuple - """ - source = None - if "up to date" in out: - return (False, None) - else: - # Check if uscan downloaded something - for row in out.split("\n"): - # uscan >= 2.10.70 has a target element: - m = re.match(r"(.*)", row) - if m: - source = '../%s' % m.group(1) - break - elif row.startswith(''): - m = re.match(r".*symlinked ([^\s]+) to it", row) - if m: - source = "../%s" % m.group(1) - break - m = re.match(r"Successfully downloaded updated package ([^<]+)", row) - if m: - source = "../%s" % m.group(1) - break - # try to determine the already downloaded sources name - else: - d = {} - for row in out.split("\n"): - for n in ('package', 'upstream-version', 'upstream-url'): - m = re.match("<%s>(.*)" % (n,n), row) - if m: - d[n] = m.group(1) - d["ext"] = os.path.splitext(d['upstream-url'])[1] - # We want the name of the orig tarball if possible - source = "../%(package)s_%(upstream-version)s.orig.tar%(ext)s" % d - if not os.path.exists(source): - # Fall back to the sources name otherwise - source = "../%s" % d['upstream-url'].rsplit('/',1)[1] - print source - if not os.path.exists(source): - source = None - return (True, source) - - -def do_uscan(): - """invoke uscan to fetch a new upstream version""" - p = subprocess.Popen(['uscan', '--symlink', '--destdir=..', '--dehs'], stdout=subprocess.PIPE) - out = p.communicate()[0] - return parse_uscan(out) - - -def get_arch(): - pipe = subprocess.Popen(["dpkg", "--print-architecture"], shell=False, stdout=subprocess.PIPE) - arch = pipe.stdout.readline().strip() - return arch - - -def compare_versions(version1, version2): - """compares to Debian versionnumbers suitable for sort()""" - return DpkgCompareVersions()(version1, version2) - - -# vim:et:ts=4:sw=4:et:sts=4:ai:set list listchars=tab\:»·,trail\:·: diff --git a/gbp/deb/__init__.py b/gbp/deb/__init__.py new file mode 100644 index 0000000..881d1ae --- /dev/null +++ b/gbp/deb/__init__.py @@ -0,0 +1,618 @@ +# vim: set fileencoding=utf-8 : +# +# (C) 2006,2007,2011 Guido Günther +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +"""provides some debian source package related helpers""" + +import commands +import email +import os +import re +import subprocess +import sys +import glob + +import gbp.command_wrappers as gbpc +from gbp.errors import GbpError +from gbp.git import GitRepositoryError + +# When trying to parse a version-number from a dsc or changes file, these are +# the valid characters. +debian_version_chars = 'a-zA-Z\d.~+-' + +# Valid package names according to Debian Policy Manual 5.6.1: +# "Package names (both source and binary, see Package, Section 5.6.7) +# must consist only of lower case letters (a-z), digits (0-9), plus (+) +# and minus (-) signs, and periods (.). They must be at least two +# characters long and must start with an alphanumeric character." +packagename_re = re.compile("^[a-z0-9][a-z0-9\.\+\-]+$") +packagename_msg = """Package names must be at least two characters long, start with an +alphanumeric and can only containg lower case letters (a-z), digits +(0-9), plus signs (+), minus signs (-), and periods (.)""" + +# Valid upstream versions according to Debian Policy Manual 5.6.12: +# "The upstream_version may contain only alphanumerics[32] and the +# characters . + - : ~ (full stop, plus, hyphen, colon, tilde) and +# should start with a digit. If there is no debian_revision then hyphens +# are not allowed; if there is no epoch then colons are not allowed." +# Since we don't know about any epochs and debian revisions yet, the +# last two conditions are not checked. +upstreamversion_re = re.compile("^[0-9][a-z0-9\.\+\-\:\~]*$") +upstreamversion_msg = """Upstream version numbers must start with a digit and can only containg lower case +letters (a-z), digits (0-9), full stops (.), plus signs (+), minus signs +(-), colons (:) and tildes (~)""" + +# compression types, extra options and extensions +compressor_opts = { 'gzip' : [ '-n', 'gz' ], + 'bzip2' : [ '', 'bz2' ], + 'lzma' : [ '', 'lzma' ], + 'xz' : [ '', 'xz' ] } + +# Map frequently used names of compression types to the internal ones: +compressor_aliases = { 'bz2' : 'bzip2', + 'gz' : 'gzip', } + +class NoChangelogError(Exception): + """no changelog found""" + pass + +class ParseChangeLogError(Exception): + """problem parsing changelog""" + pass + + +class DpkgCompareVersions(gbpc.Command): + cmd='/usr/bin/dpkg' + + def __init__(self): + if not os.access(self.cmd, os.X_OK): + raise GbpError, "%s not found - cannot use compare versions" % self.cmd + gbpc.Command.__init__(self, self.cmd, ['--compare-versions']) + + def __call__(self, version1, version2): + self.run_error = "Couldn't compare %s with %s" % (version1, version2) + res = gbpc.Command.call(self, [ version1, 'lt', version2 ]) + if res not in [ 0, 1 ]: + raise gbpc.CommandExecFailed, "%s: bad return code %d" % (self.run_error, res) + if res == 0: + return -1 + elif res == 1: + res = gbpc.Command.call(self, [ version1, 'gt', version2 ]) + if res not in [ 0, 1 ]: + raise gbpc.CommandExecFailed, "%s: bad return code %d" % (self.run_error, res) + if res == 0: + return 1 + return 0 + + +class UpstreamSource(object): + """ + Upstream source. Can be either an unpacked dir, a tarball or another type + of archive + + @cvar is_dir: are the upstream sources an unpacked dir + @type is_dir: boolean + @cvar _orig: are the upstream sources already suitable as an upstream + tarball + @type _orig: boolean + @cvar _path: path to the upstream sources + @type _path: string + @cvar _unpacked: path to the unpacked source tree + @type _unpacked: string + """ + def __init__(self, name, unpacked=None): + self.is_dir = False + self._orig = False + self._path = name + self.unpacked = unpacked + + self.is_dir = [False, True][os.path.isdir(name)] + self._check_orig() + if self.is_dir: + self.unpacked = self.path + + def _check_orig(self): + """Check if archive can be used as orig tarball""" + if self.is_dir: + self._orig = False + return + + parts = self._path.split('.') + try: + if parts[-2] == 'tar': + if (parts[-1] in compressor_opts or + parts[-1] in compressor_aliases): + self._orig = True + except IndexError: + self._orig = False + + @property + def is_orig(self): + return self._orig + + @property + def path(self): + return self._path.rstrip('/') + + def unpack(self, dir, filters=[]): + """ + Unpack packed upstream sources into a given directory + and determine the toplevel of the source tree. + """ + if self.is_dir: + raise GbpError, "Cannot unpack directory %s" % self.path + + if not filters: + filters = [] + + if type(filters) != type([]): + raise GbpError, "Filters must be a list" + + self._unpack_archive(dir, filters) + self.unpacked = self._unpacked_toplevel(dir) + + def _unpack_archive(self, dir, filters): + """ + Unpack packed upstream sources into a given directory. + """ + ext = os.path.splitext(self.path)[1] + if ext in [ ".zip", ".xpi" ]: + self._unpack_zip(dir) + else: + self._unpack_tar(dir, filters) + + def _unpack_zip(self, dir): + try: + gbpc.UnpackZipArchive(self.path, dir)() + except gbpc.CommandExecFailed: + raise GbpError, "Unpacking of %s failed" % self.path + + def _unpacked_toplevel(self, dir): + """unpacked archives can contain a leading directory or not""" + unpacked = glob.glob('%s/*' % dir) + unpacked.extend(glob.glob("%s/.*" % dir)) # include hidden files and folders + # Check that dir contains nothing but a single folder: + if len(unpacked) == 1 and os.path.isdir(unpacked[0]): + return unpacked[0] + else: + return dir + + def _unpack_tar(self, dir, filters): + """ + unpack a .orig.tar.gz to tmpdir, leave the cleanup to the caller in case of + an error + """ + try: + unpackArchive = gbpc.UnpackTarArchive(self.path, dir, filters) + unpackArchive() + except gbpc.CommandExecFailed: + # unpackArchive already printed an error message + raise GbpError + + def pack(self, newarchive, filters=[]): + """ + recreate a new archive from the current one + + @param newarchive: the name of the new archive + @type newarchive: string + @param filters: tar filters to apply + @type filters: array of strings + @return: the new upstream source + @rtype: UpstreamSource + """ + if not self.unpacked: + raise GbpError, "Need an unpacked source tree to pack" + + if not filters: + filters = [] + + if type(filters) != type([]): + raise GbpError, "Filters must be a list" + + try: + unpacked = self.unpacked.rstrip('/') + repackArchive = gbpc.PackTarArchive(newarchive, + os.path.dirname(unpacked), + os.path.basename(unpacked), + filters) + repackArchive() + except gbpc.CommandExecFailed: + # repackArchive already printed an error + raise GbpError + return UpstreamSource(newarchive) + + @staticmethod + def known_compressions(): + return [ args[1][-1] for args in compressor_opts.items() ] + + def guess_version(self, extra_regex=r''): + """ + Guess the package name and version from the filename of an upstream + archive. + + >>> UpstreamSource('foo-bar_0.2.orig.tar.gz').guess_version() + ('foo-bar', '0.2') + >>> UpstreamSource('foo-Bar_0.2.orig.tar.gz').guess_version() + >>> UpstreamSource('git-bar-0.2.tar.gz').guess_version() + ('git-bar', '0.2') + >>> UpstreamSource('git-bar-0.2-rc1.tar.gz').guess_version() + ('git-bar', '0.2-rc1') + >>> UpstreamSource('git-bar-0.2:~-rc1.tar.gz').guess_version() + ('git-bar', '0.2:~-rc1') + >>> UpstreamSource('git-Bar-0A2d:rc1.tar.bz2').guess_version() + ('git-Bar', '0A2d:rc1') + >>> UpstreamSource('git-1.tar.bz2').guess_version() + ('git', '1') + >>> UpstreamSource('kvm_87+dfsg.orig.tar.gz').guess_version() + ('kvm', '87+dfsg') + >>> UpstreamSource('foo-Bar_0.2.orig.tar.gz').guess_version() + >>> UpstreamSource('foo-Bar-a.b.tar.gz').guess_version() + >>> UpstreamSource('foo-bar_0.2.orig.tar.xz').guess_version() + ('foo-bar', '0.2') + >>> UpstreamSource('foo-bar_0.2.orig.tar.lzma').guess_version() + ('foo-bar', '0.2') + + @param extra_regex: additional regex to apply, needs a 'package' and a + 'version' group + @return: (package name, version) or None. + @rtype: tuple + """ + version_chars = r'[a-zA-Z\d\.\~\-\:\+]' + extensions = r'\.tar\.(%s)' % "|".join(self.known_compressions()) + + version_filters = map ( lambda x: x % (version_chars, extensions), + ( # Debian package_.orig.tar.gz: + r'^(?P[a-z\d\.\+\-]+)_(?P%s+)\.orig%s', + # Upstream package-.tar.gz: + r'^(?P[a-zA-Z\d\.\+\-]+)-(?P[0-9]%s*)%s')) + if extra_regex: + version_filters = extra_regex + version_filters + + for filter in version_filters: + m = re.match(filter, os.path.basename(self.path)) + if m: + return (m.group('package'), m.group('version')) + + +class DscFile(object): + """Keeps all needed data read from a dscfile""" + compressions = r"(%s)" % '|'.join(UpstreamSource.known_compressions()) + pkg_re = re.compile(r'Source:\s+(?P.+)\s*') + version_re = re.compile(r'Version:\s((?P\d+)\:)?(?P[%s]+)\s*$' % debian_version_chars) + tar_re = re.compile(r'^\s\w+\s\d+\s+(?P[^_]+_[^_]+(\.orig)?\.tar\.%s)' % compressions) + diff_re = re.compile(r'^\s\w+\s\d+\s+(?P[^_]+_[^_]+\.diff.(gz|bz2))') + deb_tgz_re = re.compile(r'^\s\w+\s\d+\s+(?P[^_]+_[^_]+\.debian.tar.%s)' % compressions) + format_re = re.compile(r'Format:\s+(?P[0-9.]+)\s*') + + def __init__(self, dscfile): + self.pkg = "" + self.tgz = "" + self.diff = "" + self.deb_tgz = "" + self.pkgformat = "1.0" + self.debian_version = "" + self.upstream_version = "" + self.native = False + self.dscfile = os.path.abspath(dscfile) + + f = file(self.dscfile) + fromdir = os.path.dirname(os.path.abspath(dscfile)) + for line in f: + m = self.version_re.match(line) + if m and not self.upstream_version: + if '-' in m.group('version'): + self.debian_version = m.group('version').split("-")[-1] + self.upstream_version = "-".join(m.group('version').split("-")[0:-1]) + self.native = False + else: + self.native = True # Debian native package + self.upstream_version = m.group('version') + if m.group('epoch'): + self.epoch = m.group('epoch') + else: + self.epoch = "" + continue + m = self.pkg_re.match(line) + if m: + self.pkg = m.group('pkg') + continue + m = self.deb_tgz_re.match(line) + if m: + self.deb_tgz = os.path.join(fromdir, m.group('deb_tgz')) + continue + m = self.tar_re.match(line) + if m: + self.tgz = os.path.join(fromdir, m.group('tar')) + continue + m = self.diff_re.match(line) + if m: + self.diff = os.path.join(fromdir, m.group('diff')) + continue + m = self.format_re.match(line) + if m: + self.pkgformat = m.group('format') + continue + f.close() + + if not self.pkg: + raise GbpError, "Cannot parse package name from '%s'" % self.dscfile + elif not self.tgz: + raise GbpError, "Cannot parse archive name from '%s'" % self.dscfile + if not self.upstream_version: + raise GbpError, "Cannot parse version number from '%s'" % self.dscfile + if not self.native and not self.debian_version: + raise GbpError, "Cannot parse Debian version number from '%s'" % self.dscfile + + def _get_version(self): + version = [ "", self.epoch + ":" ][len(self.epoch) > 0] + if self.native: + version += self.upstream_version + else: + version += "%s-%s" % (self.upstream_version, self.debian_version) + return version + + version = property(_get_version) + + def __str__(self): + return "<%s object %s>" % (self.__class__.__name__, self.dscfile) + + +def parse_dsc(dscfile): + """parse dsc by creating a DscFile object""" + try: + dsc = DscFile(dscfile) + except IOError, err: + raise GbpError, "Error reading dsc file: %s" % err + + return dsc + +def parse_changelog_repo(repo, branch, filename): + """ + Parse the changelog file from given branch in the git + repository. + """ + try: + # Note that we could just pass in the branch:filename notation + # to show as well, but we want to check if the branch / filename + # exists first, so we can give a separate error from other + # repository errors. + sha = repo.rev_parse("%s:%s" % (branch, filename)) + except GitRepositoryError: + raise NoChangelogError, "Changelog %s not found in branch %s" % (filename, branch) + + lines = repo.show(sha) + return parse_changelog('\n'.join(lines)) + +def parse_changelog(contents=None, filename=None): + """ + Parse the content of a changelog file. Either contents, containing + the contents of a changelog file, or filename, pointing to a + changelog file must be passed. + + Returns: + + cp['Version']: full version string including epoch + cp['Upstream-Version']: upstream version, if not debian native + cp['Debian-Version']: debian release + cp['Epoch']: epoch, if any + cp['NoEpoch-Version']: full version string excluding epoch + """ + # Check that either contents or filename is passed (but not both) + if (not filename and not contents) or (filename and contents): + raise Exception("Either filename or contents must be passed to parse_changelog") + + # If a filename was passed, check if it exists + if filename and not os.access(filename, os.F_OK): + raise NoChangelogError, "Changelog %s not found" % (filename, ) + + # If no filename was passed, let parse_changelog read from stdin + if not filename: + filename = '-' + + # Note that if contents is None, stdin will just be closed right + # away by communicate. + cmd = subprocess.Popen(['dpkg-parsechangelog', '-l%s' % filename], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + (output, errors) = cmd.communicate(contents) + if cmd.returncode: + raise ParseChangeLogError, "Failed to parse changelog. dpkg-parsechangelog said:\n%s" % (errors, ) + # Parse the result of dpkg-parsechangelog (which looks like + # email headers) + cp = email.message_from_string(output) + try: + if ':' in cp['Version']: + cp['Epoch'], cp['NoEpoch-Version'] = cp['Version'].split(':', 1) + else: + cp['NoEpoch-Version'] = cp['Version'] + if '-' in cp['NoEpoch-Version']: + cp['Upstream-Version'], cp['Debian-Version'] = cp['NoEpoch-Version'].rsplit('-', 1) + else: + cp['Debian-Version'] = cp['NoEpoch-Version'] + except TypeError: + raise ParseChangeLogError, output.split('\n')[0] + return cp + + +def orig_file(cp, compression): + """ + The name of the orig file belonging to changelog cp + + >>> orig_file({'Source': 'foo', 'Upstream-Version': '1.0'}, "bzip2") + 'foo_1.0.orig.tar.bz2' + >>> orig_file({'Source': 'bar', 'Upstream-Version': '0.0~git1234'}, "xz") + 'bar_0.0~git1234.orig.tar.xz' + """ + ext = compressor_opts[compression][1] + return "%s_%s.orig.tar.%s" % (cp['Source'], cp['Upstream-Version'], ext) + + +def is_native(cp): + """ + Is this a debian native package + + >>> is_native(dict(Version="1")) + True + >>> is_native(dict(Version="1-1")) + False + """ + return not '-' in cp['Version'] + +def is_valid_packagename(name): + "Is this a valid Debian package name?" + return packagename_re.match(name) + +def is_valid_upstreamversion(version): + "Is this a valid upstream version number?" + return upstreamversion_re.match(version) + +def get_compression(orig_file): + """ + Given an orig file return the compression used + + >>> get_compression("abc.tar.gz") + 'gzip' + >>> get_compression("abc.tar.bz2") + 'bzip2' + >>> get_compression("abc.tar.foo") + >>> get_compression("abc") + """ + try: + ext = orig_file.rsplit('.',1)[1] + except IndexError: + return None + for (c, o) in compressor_opts.iteritems(): + if o[1] == ext: + return c + return None + + +def has_epoch(cp): + """ + Does the topmost version number in the changelog contain an epoch + >>> has_epoch(dict(Epoch="1")) + True + >>> has_epoch(dict()) + False + """ + return cp.has_key("Epoch") + + +def has_orig(cp, compression, dir): + "Check if orig.tar.gz exists in dir" + try: + os.stat( os.path.join(dir, orig_file(cp, compression)) ) + except OSError: + return False + return True + +def symlink_orig(cp, compression, orig_dir, output_dir, force=False): + """ + symlink orig.tar.gz from orig_dir to output_dir + @return: True if link was created or src == dst + False in case of error or src doesn't exist + """ + orig_dir = os.path.abspath(orig_dir) + output_dir = os.path.abspath(output_dir) + + if orig_dir == output_dir: + return True + + src = os.path.join(orig_dir, orig_file(cp, compression)) + dst = os.path.join(output_dir, orig_file(cp, compression)) + if not os.access(src, os.F_OK): + return False + try: + if os.access(dst, os.F_OK) and force: + os.unlink(dst) + os.symlink(src, dst) + except OSError: + return False + return True + + +def parse_uscan(out): + """ + Parse the uscan output return (True, tarball) if a new version was + downloaded and could be located. If the tarball can't be located it returns + (True, None). Returns (False, None) if the current version is up to date. + + >>> parse_uscan("up to date") + (False, None) + >>> parse_uscan("virt-viewer_0.4.0.orig.tar.gz") + (True, '../virt-viewer_0.4.0.orig.tar.gz') + + @param out: uscan output + @type out: string + @return: status and tarball name + @rtype: tuple + """ + source = None + if "up to date" in out: + return (False, None) + else: + # Check if uscan downloaded something + for row in out.split("\n"): + # uscan >= 2.10.70 has a target element: + m = re.match(r"(.*)", row) + if m: + source = '../%s' % m.group(1) + break + elif row.startswith(''): + m = re.match(r".*symlinked ([^\s]+) to it", row) + if m: + source = "../%s" % m.group(1) + break + m = re.match(r"Successfully downloaded updated package ([^<]+)", row) + if m: + source = "../%s" % m.group(1) + break + # try to determine the already downloaded sources name + else: + d = {} + for row in out.split("\n"): + for n in ('package', 'upstream-version', 'upstream-url'): + m = re.match("<%s>(.*)" % (n,n), row) + if m: + d[n] = m.group(1) + d["ext"] = os.path.splitext(d['upstream-url'])[1] + # We want the name of the orig tarball if possible + source = "../%(package)s_%(upstream-version)s.orig.tar%(ext)s" % d + if not os.path.exists(source): + # Fall back to the sources name otherwise + source = "../%s" % d['upstream-url'].rsplit('/',1)[1] + print source + if not os.path.exists(source): + source = None + return (True, source) + + +def do_uscan(): + """invoke uscan to fetch a new upstream version""" + p = subprocess.Popen(['uscan', '--symlink', '--destdir=..', '--dehs'], stdout=subprocess.PIPE) + out = p.communicate()[0] + return parse_uscan(out) + + +def get_arch(): + pipe = subprocess.Popen(["dpkg", "--print-architecture"], shell=False, stdout=subprocess.PIPE) + arch = pipe.stdout.readline().strip() + return arch + + +def compare_versions(version1, version2): + """compares to Debian versionnumbers suitable for sort()""" + return DpkgCompareVersions()(version1, version2) + + +# vim:et:ts=4:sw=4:et:sts=4:ai:set list listchars=tab\:»·,trail\:·: diff --git a/setup.py b/setup.py index d8676b9..9a0cb59 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,7 @@ # END OF COPYRIGHT # import subprocess -from setuptools import setup +from setuptools import setup, find_packages def fetch_version(): @@ -55,7 +55,7 @@ setup(name = "gbp", 'bin/gbp-clone', 'bin/gbp-create-remote-repo', 'bin/git-pbuilder'], - packages = [ 'gbp', 'gbp.scripts', 'gbp.git' ], + packages = find_packages(), data_files = [("/etc/git-buildpackage/", ["gbp.conf"]),], setup_requires=['nose>=1.0', 'coverage>=3.4'], ) diff --git a/tests/06_test_upstream_source.py b/tests/06_test_upstream_source.py index 13e0e95..e414fa4 100644 --- a/tests/06_test_upstream_source.py +++ b/tests/06_test_upstream_source.py @@ -49,7 +49,7 @@ class TestTar(unittest.TestCase): repacked = source.pack(target) self.assertEqual(repacked.is_orig, True) self.assertEqual(repacked.is_dir, False) - self._check_tar(repacked, ["gbp/deb.py", "gbp/__init__.py"]) + self._check_tar(repacked, ["gbp/errors.py", "gbp/__init__.py"]) def test_pack_filtered(self): """Check if filtering out files works""" @@ -59,7 +59,7 @@ class TestTar(unittest.TestCase): repacked = source.pack(target, ["__init__.py"]) self.assertEqual(repacked.is_orig, True) self.assertEqual(repacked.is_dir, False) - self._check_tar(repacked, ["gbp/deb.py"], + self._check_tar(repacked, ["gbp/errors.py"], ["gbp/__init__.py"]) -- cgit v1.2.3