Coverage for distro_tracker/core/utils/compression.py: 100%

47 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-01-12 09:15 +0000

1# Copyright 2017 The Distro Tracker Developers 

2# See the COPYRIGHT file at the top-level directory of this distribution and 

3# at https://deb.li/DTAuthors 

4# 

5# This file is part of Distro Tracker. It is subject to the license terms 

6# in the LICENSE file found in the top-level directory of this 

7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker, 

8# including this file, may be copied, modified, propagated, or distributed 

9# except according to the terms contained in the LICENSE file. 

10""" 

11Utilities for handling compression 

12""" 

13import contextlib 

14import io 

15 

16 

17def guess_compression_method(filepath): 

18 """Given filepath, tries to determine the compression of the file.""" 

19 

20 filepath = filepath.lower() 

21 

22 extensions_to_method = { 

23 ".gz": "gzip", 

24 ".bz2": "bzip2", 

25 ".xz": "xz", 

26 } 

27 

28 for (ext, method) in extensions_to_method.items(): 

29 if filepath.endswith(ext): 

30 return method 

31 

32 return None 

33 

34 

35@contextlib.contextmanager 

36def get_uncompressed_stream(input_stream, compression="auto", 

37 text=False, encoding='utf-8'): 

38 """ 

39 Returns a file-like object (aka stream) providing an uncompressed 

40 version of the content read on the input stream provided. 

41 

42 :param input_stream: The file-like object providing compressed data. 

43 :param compression: The compression type. Specify "auto" to let the function 

44 guess it out of the associated filename (the input_stream needs to have 

45 a name attribute, otherwise a ValueError is raised). 

46 :type compression: str 

47 :param text: If True, open the stream as a text stream. 

48 :type text: boolean 

49 :param encoding: Encoding to use to decode the text. 

50 :type encoding: str 

51 """ 

52 

53 if compression == "auto": # Try to guess compression method if possible 

54 if hasattr(input_stream, 'name'): 

55 compression = guess_compression_method(input_stream.name) 

56 else: 

57 raise ValueError("Can't retrieve a name out of %r" % input_stream) 

58 

59 if text: 

60 kwargs = {'mode': 'rt', 'encoding': encoding} 

61 else: 

62 kwargs = {'mode': 'rb'} 

63 

64 if compression == "gzip": 

65 import gzip 

66 stream = gzip.open(filename=input_stream, **kwargs) 

67 elif compression == "bzip2": 

68 import bz2 

69 stream = bz2.open(filename=input_stream, **kwargs) 

70 elif compression == "xz": 

71 import lzma 

72 stream = lzma.open(filename=input_stream, **kwargs) 

73 elif compression is None: 

74 if text: 

75 stream = io.TextIOWrapper(input_stream, encoding=encoding) 

76 else: 

77 stream = input_stream 

78 else: 

79 raise NotImplementedError( 

80 "Unknown compression method: %r" % compression) 

81 

82 yield stream 

83 

84 stream.close() 

85 # close underlying stream to avoid leaking open files 

86 input_stream.close() 

87 

88 

89def get_compressor_factory(compression): 

90 """ 

91 Returns a function that can create a file-like object used to compress 

92 data. The returned function has actually the same API as gzip.open, 

93 lzma.open and bz2.open. You have to pass mode='wb' or mode='wt' to 

94 the returned function to use it in write mode. 

95 

96 .. code-block:: python3 

97 

98 compressor_factory = get_compressor_factory("xz") 

99 compressor = compressor_factory(path, mode="wb") 

100 compressor.write(b"Test") 

101 compressor.close() 

102 

103 :param compression: The compression method to use. 

104 :type compression: str 

105 """ 

106 if compression == "gzip": 

107 import gzip 

108 return gzip.open 

109 elif compression == "bzip2": 

110 import bz2 

111 return bz2.open 

112 elif compression == "xz": 

113 import lzma 

114 return lzma.open 

115 elif compression is None: 

116 return open 

117 else: 

118 raise NotImplementedError( 

119 "Unknown compression method: %r" % compression)