Coverage for qubalab/images/qupath_server.py: 95%

64 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-10-07 15:29 +0000

1import numpy as np 

2import tempfile 

3import os 

4from typing import Optional 

5from py4j.java_gateway import JavaGateway, JavaObject 

6from urllib.parse import urlparse, unquote 

7from .image_server import ImageServer 

8from .metadata.image_metadata import ImageMetadata 

9from .metadata.image_shape import ImageShape 

10from .metadata.image_channel import ImageChannel 

11from .metadata.pixel_calibration import PixelCalibration, PixelLength 

12from .region_2d import Region2D 

13from .utils import base64_to_image, bytes_to_image 

14from ..qupath import qupath_gateway 

15 

16 

17class QuPathServer(ImageServer): 

18 """ 

19 An ImageServer that communicates with a QuPath ImageServer through a Gateway. 

20 

21 Closing this server won't close the underlying QuPath ImageServer. 

22 """ 

23 

24 def __init__( 

25 self, 

26 gateway: Optional[JavaGateway] = None, 

27 qupath_server: Optional[JavaObject] = None, 

28 pixel_access: str = "base_64", 

29 **kwargs, 

30 ): 

31 """ 

32 :param gateway: the gateway between Python and QuPath. If not specified, the default gateway is used 

33 :param qupath_server: a Java object representing the QuPath ImageServer. If not specified, the currently 

34 opened in QuPath ImageServer is used 

35 :param pixel_access: how to send pixel values from QuPath to Python. Can be 'base_64' to convert pixels to 

36 base 64 encoded text, 'bytes' to to convert pixels to bytes, or 'temp_files' to use 

37 temporary files 

38 :param resize_method: the resampling method to use when resizing the image for downsampling. Bicubic by default 

39 :raises ValueError: when pixel_access has an unexpected value 

40 """ 

41 super().__init__(**kwargs) 

42 

43 available_pixel_access = ["base_64", "bytes", "temp_files"] 

44 if pixel_access not in available_pixel_access: 

45 raise ValueError( 

46 f"The provided pixel access ({pixel_access}) is not one of {str(available_pixel_access)}" 

47 ) 

48 

49 self._gateway = ( 

50 qupath_gateway.get_default_gateway() if gateway is None else gateway 

51 ) 

52 self._qupath_server = ( 

53 qupath_gateway.get_current_image_data(gateway).getServer() 

54 if qupath_server is None 

55 else qupath_server 

56 ) 

57 self._pixel_access = pixel_access 

58 

59 def close(self): 

60 pass 

61 

62 def _build_metadata(self) -> ImageMetadata: 

63 qupath_metadata = self._qupath_server.getMetadata() 

64 

65 return ImageMetadata( 

66 path=QuPathServer._find_qupath_server_path(self._qupath_server), 

67 name=qupath_metadata.getName(), 

68 shapes=tuple( 

69 [ 

70 ImageShape( 

71 x=level.getWidth(), 

72 y=level.getHeight(), 

73 c=self._qupath_server.nChannels(), 

74 z=self._qupath_server.nZSlices(), 

75 t=self._qupath_server.nTimepoints(), 

76 ) 

77 for level in qupath_metadata.getLevels() 

78 ] 

79 ), 

80 pixel_calibration=QuPathServer._find_qupath_server_pixel_calibration( 

81 self._qupath_server 

82 ), 

83 is_rgb=self._qupath_server.isRGB(), 

84 dtype=np.dtype(self._qupath_server.getPixelType().toString().lower()), 

85 channels=tuple( 

86 [ 

87 ImageChannel(c.getName(), QuPathServer._unpack_color(c.getColor())) 

88 for c in qupath_metadata.getChannels() 

89 ] 

90 ), 

91 downsamples=tuple( 

92 [d for d in self._qupath_server.getPreferredDownsamples()] 

93 ), 

94 ) 

95 

96 def _read_block(self, level: int, region: Region2D) -> np.ndarray: 

97 if level < 0: 

98 level = len(self.metadata.downsamples) + level 

99 downsample = self._qupath_server.getDownsampleForResolution(level) 

100 

101 request = self._gateway.jvm.qupath.lib.regions.RegionRequest.createInstance( 

102 self._qupath_server.getPath(), 

103 downsample, 

104 int(round(region.x * downsample)), 

105 int(round(region.y * downsample)), 

106 int(round(region.width * downsample)), 

107 int(round(region.height * downsample)), 

108 region.z, 

109 region.t, 

110 ) 

111 

112 if self._pixel_access == "temp_files": 

113 temp_path = tempfile.mkstemp(prefix="qubalab-", suffix=".tif")[1] 

114 

115 self._gateway.entry_point.writeImageRegion( 

116 self._qupath_server, request, temp_path 

117 ) 

118 image = bytes_to_image( 

119 temp_path, 

120 self.metadata.is_rgb, 

121 ImageShape(region.width, region.height, c=self.metadata.n_channels), 

122 ) 

123 ## on Windows, this fails because the file handle is open elsewhere 

124 ## slightly bad manners to pollute tempfiles but should be insignificant 

125 if not os.name == "nt": 

126 os.remove(temp_path) 

127 else: 

128 format = "png" if self.metadata.is_rgb else "imagej tiff" 

129 

130 if self._pixel_access == "bytes": 

131 image = bytes_to_image( 

132 self._gateway.entry_point.getImageBytes( 

133 self._qupath_server, request, format 

134 ), 

135 self.metadata.is_rgb, 

136 ImageShape(region.width, region.height, c=self.metadata.n_channels), 

137 ) 

138 else: 

139 image = base64_to_image( 

140 self._gateway.entry_point.getImageBase64( 

141 self._qupath_server, request, format 

142 ), 

143 self.metadata.is_rgb, 

144 ImageShape(region.width, region.height, c=self.metadata.n_channels), 

145 ) 

146 

147 return image 

148 

149 @staticmethod 

150 def _find_qupath_server_path(qupath_server: JavaObject) -> str: 

151 """ 

152 Try to get the file path for a java object representing an ImageServer. 

153 This can be useful to get direct access to an image file, rather than via QuPath. 

154 """ 

155 

156 uris = tuple(str(u) for u in qupath_server.getURIs()) 

157 if len(uris) == 1: 

158 parsed = urlparse(uris[0]) 

159 if parsed.scheme == "file": 

160 return unquote(parsed.path) 

161 return qupath_server.getPath() 

162 

163 @staticmethod 

164 def _find_qupath_server_pixel_calibration( 

165 qupath_server: JavaObject, 

166 ) -> PixelCalibration: 

167 pixel_calibration = qupath_server.getPixelCalibration() 

168 

169 if pixel_calibration.hasPixelSizeMicrons(): 

170 return PixelCalibration( 

171 PixelLength.create_microns(pixel_calibration.getPixelWidthMicrons()), 

172 PixelLength.create_microns(pixel_calibration.getPixelHeightMicrons()), 

173 PixelLength.create_microns(pixel_calibration.getZSpacingMicrons()) 

174 if pixel_calibration.hasZSpacingMicrons() 

175 else PixelLength(), 

176 ) 

177 else: 

178 return PixelCalibration() 

179 

180 @staticmethod 

181 def _unpack_color(rgb: int) -> tuple[float, float, float]: 

182 r = (rgb >> 16) & 255 

183 g = (rgb >> 8) & 255 

184 b = rgb & 255 

185 return r / 255.0, g / 255.0, b / 255.0