Coverage for distro_tracker/core/utils/compression.py: 100%
47 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-01-12 09:15 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-01-12 09:15 +0000
1# Copyright 2017 The Distro Tracker Developers
2# See the COPYRIGHT file at the top-level directory of this distribution and
3# at https://deb.li/DTAuthors
4#
5# This file is part of Distro Tracker. It is subject to the license terms
6# in the LICENSE file found in the top-level directory of this
7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker,
8# including this file, may be copied, modified, propagated, or distributed
9# except according to the terms contained in the LICENSE file.
10"""
11Utilities for handling compression
12"""
13import contextlib
14import io
17def guess_compression_method(filepath):
18 """Given filepath, tries to determine the compression of the file."""
20 filepath = filepath.lower()
22 extensions_to_method = {
23 ".gz": "gzip",
24 ".bz2": "bzip2",
25 ".xz": "xz",
26 }
28 for (ext, method) in extensions_to_method.items():
29 if filepath.endswith(ext):
30 return method
32 return None
35@contextlib.contextmanager
36def get_uncompressed_stream(input_stream, compression="auto",
37 text=False, encoding='utf-8'):
38 """
39 Returns a file-like object (aka stream) providing an uncompressed
40 version of the content read on the input stream provided.
42 :param input_stream: The file-like object providing compressed data.
43 :param compression: The compression type. Specify "auto" to let the function
44 guess it out of the associated filename (the input_stream needs to have
45 a name attribute, otherwise a ValueError is raised).
46 :type compression: str
47 :param text: If True, open the stream as a text stream.
48 :type text: boolean
49 :param encoding: Encoding to use to decode the text.
50 :type encoding: str
51 """
53 if compression == "auto": # Try to guess compression method if possible
54 if hasattr(input_stream, 'name'):
55 compression = guess_compression_method(input_stream.name)
56 else:
57 raise ValueError("Can't retrieve a name out of %r" % input_stream)
59 if text:
60 kwargs = {'mode': 'rt', 'encoding': encoding}
61 else:
62 kwargs = {'mode': 'rb'}
64 if compression == "gzip":
65 import gzip
66 stream = gzip.open(filename=input_stream, **kwargs)
67 elif compression == "bzip2":
68 import bz2
69 stream = bz2.open(filename=input_stream, **kwargs)
70 elif compression == "xz":
71 import lzma
72 stream = lzma.open(filename=input_stream, **kwargs)
73 elif compression is None:
74 if text:
75 stream = io.TextIOWrapper(input_stream, encoding=encoding)
76 else:
77 stream = input_stream
78 else:
79 raise NotImplementedError(
80 "Unknown compression method: %r" % compression)
82 yield stream
84 stream.close()
85 # close underlying stream to avoid leaking open files
86 input_stream.close()
89def get_compressor_factory(compression):
90 """
91 Returns a function that can create a file-like object used to compress
92 data. The returned function has actually the same API as gzip.open,
93 lzma.open and bz2.open. You have to pass mode='wb' or mode='wt' to
94 the returned function to use it in write mode.
96 .. code-block:: python3
98 compressor_factory = get_compressor_factory("xz")
99 compressor = compressor_factory(path, mode="wb")
100 compressor.write(b"Test")
101 compressor.close()
103 :param compression: The compression method to use.
104 :type compression: str
105 """
106 if compression == "gzip":
107 import gzip
108 return gzip.open
109 elif compression == "bzip2":
110 import bz2
111 return bz2.open
112 elif compression == "xz":
113 import lzma
114 return lzma.open
115 elif compression is None:
116 return open
117 else:
118 raise NotImplementedError(
119 "Unknown compression method: %r" % compression)