1# Copyright 2017 The Distro Tracker Developers
2# See the COPYRIGHT file at the top-level directory of this distribution and
3# at https://deb.li/DTAuthors
4#
5# This file is part of Distro Tracker. It is subject to the license terms
6# in the LICENSE file found in the top-level directory of this
7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker,
8# including this file, may be copied, modified, propagated, or distributed
9# except according to the terms contained in the LICENSE file.
10"""
11Utilities for handling compression
12"""
13import io
16def guess_compression_method(filepath):
17 """Given filepath, tries to determine the compression of the file."""
19 filepath = filepath.lower()
21 extensions_to_method = {
22 ".gz": "gzip",
23 ".bz2": "bzip2",
24 ".xz": "xz",
25 }
27 for (ext, method) in extensions_to_method.items():
28 if filepath.endswith(ext):
29 return method
31 return None
34def get_uncompressed_stream(input_stream, compression="auto",
35 text=False, encoding='utf-8'):
36 """
37 Returns a file-like object (aka stream) providing an uncompressed
38 version of the content read on the input stream provided.
40 :param input_stream: The file-like object providing compressed data.
41 :param compression: The compression type. Specify "auto" to let the function
42 guess it out of the associated filename (the input_stream needs to have
43 a name attribute, otherwise a ValueError is raised).
44 :type compression: str
45 :param text: If True, open the stream as a text stream.
46 :type text: boolean
47 :param encoding: Encoding to use to decode the text.
48 :type encoding: str
49 """
51 if compression == "auto": # Try to guess compression method if possible
52 if hasattr(input_stream, 'name'):
53 compression = guess_compression_method(input_stream.name)
54 else:
55 raise ValueError("Can't retrieve a name out of %r" % input_stream)
57 if text:
58 kwargs = {'mode': 'rt', 'encoding': encoding}
59 else:
60 kwargs = {'mode': 'rb'}
62 if compression == "gzip":
63 import gzip
64 return gzip.open(filename=input_stream, **kwargs)
65 elif compression == "bzip2":
66 import bz2
67 return bz2.open(filename=input_stream, **kwargs)
68 elif compression == "xz":
69 import lzma
70 return lzma.open(filename=input_stream, **kwargs)
71 elif compression is None:
72 if text:
73 return io.TextIOWrapper(input_stream, encoding=encoding)
74 else:
75 return input_stream
76 else:
77 raise NotImplementedError(
78 "Unknown compression method: %r" % compression)
81def get_compressor_factory(compression):
82 """
83 Returns a function that can create a file-like object used to compress
84 data. The returned function has actually the same API as gzip.open,
85 lzma.open and bz2.open. You have to pass mode='wb' or mode='wt' to
86 the returned function to use it in write mode.
88 .. code-block:: python3
90 compressor_factory = get_compressor_factory("xz")
91 compressor = compressor_factory(path, mode="wb")
92 compressor.write(b"Test")
93 compressor.close()
95 :param compression: The compression method to use.
96 :type compression: str
97 """
98 if compression == "gzip":
99 import gzip
100 return gzip.open
101 elif compression == "bzip2":
102 import bz2
103 return bz2.open
104 elif compression == "xz":
105 import lzma
106 return lzma.open
107 elif compression is None:
108 return open
109 else:
110 raise NotImplementedError(
111 "Unknown compression method: %r" % compression)