1# Copyright 2017 The Distro Tracker Developers 

2# See the COPYRIGHT file at the top-level directory of this distribution and 

3# at https://deb.li/DTAuthors 

4# 

5# This file is part of Distro Tracker. It is subject to the license terms 

6# in the LICENSE file found in the top-level directory of this 

7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker, 

8# including this file, may be copied, modified, propagated, or distributed 

9# except according to the terms contained in the LICENSE file. 

10""" 

11Utilities for handling compression 

12""" 

13import io 

14 

15 

16def guess_compression_method(filepath): 

17 """Given filepath, tries to determine the compression of the file.""" 

18 

19 filepath = filepath.lower() 

20 

21 extensions_to_method = { 

22 ".gz": "gzip", 

23 ".bz2": "bzip2", 

24 ".xz": "xz", 

25 } 

26 

27 for (ext, method) in extensions_to_method.items(): 

28 if filepath.endswith(ext): 

29 return method 

30 

31 return None 

32 

33 

34def get_uncompressed_stream(input_stream, compression="auto", 

35 text=False, encoding='utf-8'): 

36 """ 

37 Returns a file-like object (aka stream) providing an uncompressed 

38 version of the content read on the input stream provided. 

39 

40 :param input_stream: The file-like object providing compressed data. 

41 :param compression: The compression type. Specify "auto" to let the function 

42 guess it out of the associated filename (the input_stream needs to have 

43 a name attribute, otherwise a ValueError is raised). 

44 :type compression: str 

45 :param text: If True, open the stream as a text stream. 

46 :type text: boolean 

47 :param encoding: Encoding to use to decode the text. 

48 :type encoding: str 

49 """ 

50 

51 if compression == "auto": # Try to guess compression method if possible 

52 if hasattr(input_stream, 'name'): 

53 compression = guess_compression_method(input_stream.name) 

54 else: 

55 raise ValueError("Can't retrieve a name out of %r" % input_stream) 

56 

57 if text: 

58 kwargs = {'mode': 'rt', 'encoding': encoding} 

59 else: 

60 kwargs = {'mode': 'rb'} 

61 

62 if compression == "gzip": 

63 import gzip 

64 return gzip.open(filename=input_stream, **kwargs) 

65 elif compression == "bzip2": 

66 import bz2 

67 return bz2.open(filename=input_stream, **kwargs) 

68 elif compression == "xz": 

69 import lzma 

70 return lzma.open(filename=input_stream, **kwargs) 

71 elif compression is None: 

72 if text: 

73 return io.TextIOWrapper(input_stream, encoding=encoding) 

74 else: 

75 return input_stream 

76 else: 

77 raise NotImplementedError( 

78 "Unknown compression method: %r" % compression) 

79 

80 

81def get_compressor_factory(compression): 

82 """ 

83 Returns a function that can create a file-like object used to compress 

84 data. The returned function has actually the same API as gzip.open, 

85 lzma.open and bz2.open. You have to pass mode='wb' or mode='wt' to 

86 the returned function to use it in write mode. 

87 

88 .. code-block:: python3 

89 

90 compressor_factory = get_compressor_factory("xz") 

91 compressor = compressor_factory(path, mode="wb") 

92 compressor.write(b"Test") 

93 compressor.close() 

94 

95 :param compression: The compression method to use. 

96 :type compression: str 

97 """ 

98 if compression == "gzip": 

99 import gzip 

100 return gzip.open 

101 elif compression == "bzip2": 

102 import bz2 

103 return bz2.open 

104 elif compression == "xz": 

105 import lzma 

106 return lzma.open 

107 elif compression is None: 

108 return open 

109 else: 

110 raise NotImplementedError( 

111 "Unknown compression method: %r" % compression)