Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
corpnewt
GitHub Repository: corpnewt/gibMacOS
Path: blob/master/Scripts/disk.py
175 views
1
import subprocess, plistlib, sys, os, time, json
2
sys.path.append(os.path.abspath(os.path.dirname(os.path.realpath(__file__))))
3
import run
4
if sys.version_info < (3,0):
5
# Force use of StringIO instead of cStringIO as the latter
6
# has issues with Unicode strings
7
from StringIO import StringIO
8
9
class Disk:
10
11
def __init__(self):
12
self.r = run.Run()
13
self.diskutil = self.get_diskutil()
14
self.os_version = ".".join(
15
self.r.run({"args":["sw_vers", "-productVersion"]})[0].split(".")[:2]
16
)
17
self.full_os_version = self.r.run({"args":["sw_vers", "-productVersion"]})[0]
18
if len(self.full_os_version.split(".")) < 3:
19
# Add .0 in case of 10.14
20
self.full_os_version += ".0"
21
self.sudo_mount_version = "10.13.6"
22
self.sudo_mount_types = ["efi"]
23
self.apfs = {}
24
self._update_disks()
25
26
def _get_str(self, val):
27
# Helper method to return a string value based on input type
28
if (sys.version_info < (3,0) and isinstance(val, unicode)) or (sys.version_info >= (3,0) and isinstance(val, bytes)):
29
return val.encode("utf-8")
30
return str(val)
31
32
def _get_plist(self, s):
33
p = {}
34
try:
35
if sys.version_info >= (3, 0):
36
p = plistlib.loads(s.encode("utf-8"))
37
else:
38
# p = plistlib.readPlistFromString(s)
39
# We avoid using readPlistFromString() as that uses
40
# cStringIO and fails when Unicode strings are detected
41
# Don't subclass - keep the parser local
42
from xml.parsers.expat import ParserCreate
43
# Create a new PlistParser object - then we need to set up
44
# the values and parse.
45
pa = plistlib.PlistParser()
46
# We also monkey patch this to encode unicode as utf-8
47
def end_string():
48
d = pa.getData()
49
if isinstance(d,unicode):
50
d = d.encode("utf-8")
51
pa.addObject(d)
52
pa.end_string = end_string
53
parser = ParserCreate()
54
parser.StartElementHandler = pa.handleBeginElement
55
parser.EndElementHandler = pa.handleEndElement
56
parser.CharacterDataHandler = pa.handleData
57
if isinstance(s, unicode):
58
# Encode unicode -> string; use utf-8 for safety
59
s = s.encode("utf-8")
60
# Parse the string
61
parser.Parse(s, 1)
62
p = pa.root
63
except Exception as e:
64
print(e)
65
pass
66
return p
67
68
def _compare_versions(self, vers1, vers2, pad = -1):
69
# Helper method to compare ##.## strings
70
#
71
# vers1 < vers2 = True
72
# vers1 = vers2 = None
73
# vers1 > vers2 = False
74
#
75
# Must be separated with a period
76
77
# Sanitize the pads
78
pad = -1 if not type(pad) is int else pad
79
80
# Cast as strings
81
vers1 = str(vers1)
82
vers2 = str(vers2)
83
84
# Split to lists
85
v1_parts = vers1.split(".")
86
v2_parts = vers2.split(".")
87
88
# Equalize lengths
89
if len(v1_parts) < len(v2_parts):
90
v1_parts.extend([str(pad) for x in range(len(v2_parts) - len(v1_parts))])
91
elif len(v2_parts) < len(v1_parts):
92
v2_parts.extend([str(pad) for x in range(len(v1_parts) - len(v2_parts))])
93
94
# Iterate and compare
95
for i in range(len(v1_parts)):
96
# Remove non-numeric
97
v1 = ''.join(c for c in v1_parts[i] if c.isdigit())
98
v2 = ''.join(c for c in v2_parts[i] if c.isdigit())
99
# If empty - make it a pad var
100
v1 = pad if not len(v1) else v1
101
v2 = pad if not len(v2) else v2
102
# Compare
103
if int(v1) < int(v2):
104
return True
105
elif int(v1) > int(v2):
106
return False
107
# Never differed - return None, must be equal
108
return None
109
110
def update(self):
111
self._update_disks()
112
113
def _update_disks(self):
114
self.disks = self.get_disks()
115
self.disk_text = self.get_disk_text()
116
if self._compare_versions("10.12", self.os_version):
117
self.apfs = self.get_apfs()
118
else:
119
self.apfs = {}
120
121
def get_diskutil(self):
122
# Returns the path to the diskutil binary
123
return self.r.run({"args":["which", "diskutil"]})[0].split("\n")[0].split("\r")[0]
124
125
def get_disks(self):
126
# Returns a dictionary object of connected disks
127
disk_list = self.r.run({"args":[self.diskutil, "list", "-plist"]})[0]
128
return self._get_plist(disk_list)
129
130
def get_disk_text(self):
131
# Returns plain text listing connected disks
132
return self.r.run({"args":[self.diskutil, "list"]})[0]
133
134
def get_disk_info(self, disk):
135
disk_id = self.get_identifier(disk)
136
if not disk_id:
137
return None
138
disk_list = self.r.run({"args":[self.diskutil, "info", "-plist", disk_id]})[0]
139
return self._get_plist(disk_list)
140
141
def get_disk_fs(self, disk):
142
disk_id = self.get_identifier(disk)
143
if not disk_id:
144
return None
145
return self.get_disk_info(disk_id).get("FilesystemName", None)
146
147
def get_disk_fs_type(self, disk):
148
disk_id = self.get_identifier(disk)
149
if not disk_id:
150
return None
151
return self.get_disk_info(disk_id).get("FilesystemType", None)
152
153
def get_apfs(self):
154
# Returns a dictionary object of apfs disks
155
output = self.r.run({"args":"echo y | " + self.diskutil + " apfs list -plist", "shell" : True})
156
if not output[2] == 0:
157
# Error getting apfs info - return an empty dict
158
return {}
159
disk_list = output[0]
160
p_list = disk_list.split("<?xml")
161
if len(p_list) > 1:
162
# We had text before the start - get only the plist info
163
disk_list = "<?xml" + p_list[-1]
164
return self._get_plist(disk_list)
165
166
def is_apfs(self, disk):
167
disk_id = self.get_identifier(disk)
168
if not disk_id:
169
return None
170
# Takes a disk identifier, and returns whether or not it's apfs
171
for d in self.disks.get("AllDisksAndPartitions", []):
172
if not "APFSVolumes" in d:
173
continue
174
if d.get("DeviceIdentifier", "").lower() == disk_id.lower():
175
return True
176
for a in d.get("APFSVolumes", []):
177
if a.get("DeviceIdentifier", "").lower() == disk_id.lower():
178
return True
179
return False
180
181
def is_apfs_container(self, disk):
182
disk_id = self.get_identifier(disk)
183
if not disk_id:
184
return None
185
# Takes a disk identifier, and returns whether or not that specific
186
# disk/volume is an APFS Container
187
for d in self.disks.get("AllDisksAndPartitions", []):
188
# Only check partitions
189
for p in d.get("Partitions", []):
190
if disk_id.lower() == p.get("DeviceIdentifier", "").lower():
191
return p.get("Content", "").lower() == "apple_apfs"
192
return False
193
194
def is_cs_container(self, disk):
195
disk_id = self.get_identifier(disk)
196
if not disk_id:
197
return None
198
# Takes a disk identifier, and returns whether or not that specific
199
# disk/volume is an CoreStorage Container
200
for d in self.disks.get("AllDisksAndPartitions", []):
201
# Only check partitions
202
for p in d.get("Partitions", []):
203
if disk_id.lower() == p.get("DeviceIdentifier", "").lower():
204
return p.get("Content", "").lower() == "apple_corestorage"
205
return False
206
207
def is_core_storage(self, disk):
208
disk_id = self.get_identifier(disk)
209
if not disk_id:
210
return None
211
if self._get_physical_disk(disk_id, "Logical Volume on "):
212
return True
213
return False
214
215
def get_identifier(self, disk):
216
# Should be able to take a mount point, disk name, or disk identifier,
217
# and return the disk's identifier
218
# Iterate!!
219
if not disk or not len(self._get_str(disk)):
220
return None
221
disk = self._get_str(disk).lower()
222
if disk.startswith("/dev/r"):
223
disk = disk[len("/dev/r"):]
224
elif disk.startswith("/dev/"):
225
disk = disk[len("/dev/"):]
226
if disk in self.disks.get("AllDisks", []):
227
return disk
228
for d in self.disks.get("AllDisksAndPartitions", []):
229
for a in d.get("APFSVolumes", []):
230
if disk in [ self._get_str(a.get(x, "")).lower() for x in ["DeviceIdentifier", "VolumeName", "VolumeUUID", "DiskUUID", "MountPoint"] ]:
231
return a.get("DeviceIdentifier", None)
232
for a in d.get("Partitions", []):
233
if disk in [ self._get_str(a.get(x, "")).lower() for x in ["DeviceIdentifier", "VolumeName", "VolumeUUID", "DiskUUID", "MountPoint"] ]:
234
return a.get("DeviceIdentifier", None)
235
# At this point, we didn't find it
236
return None
237
238
def get_top_identifier(self, disk):
239
disk_id = self.get_identifier(disk)
240
if not disk_id:
241
return None
242
return disk_id.replace("disk", "didk").split("s")[0].replace("didk", "disk")
243
244
def _get_physical_disk(self, disk, search_term):
245
# Change disk0s1 to disk0
246
our_disk = self.get_top_identifier(disk)
247
our_term = "/dev/" + our_disk
248
found_disk = False
249
our_text = ""
250
for line in self.disk_text.split("\n"):
251
if line.lower().startswith(our_term):
252
found_disk = True
253
continue
254
if not found_disk:
255
continue
256
if line.lower().startswith("/dev/disk"):
257
# At the next disk - bail
258
break
259
if search_term.lower() in line.lower():
260
our_text = line
261
break
262
if not len(our_text):
263
# Nothing found
264
return None
265
our_stores = "".join(our_text.strip().split(search_term)[1:]).split(" ,")
266
if not len(our_stores):
267
return None
268
for store in our_stores:
269
efi = self.get_efi(store)
270
if efi:
271
return store
272
return None
273
274
def get_physical_store(self, disk):
275
# Returns the physical store containing the EFI
276
disk_id = self.get_identifier(disk)
277
if not disk_id:
278
return None
279
if not self.is_apfs(disk_id):
280
return None
281
return self._get_physical_disk(disk_id, "Physical Store ")
282
283
def get_core_storage_pv(self, disk):
284
# Returns the core storage physical volume containing the EFI
285
disk_id = self.get_identifier(disk)
286
if not disk_id:
287
return None
288
if not self.is_core_storage(disk_id):
289
return None
290
return self._get_physical_disk(disk_id, "Logical Volume on ")
291
292
def get_parent(self, disk):
293
# Disk can be a mount point, disk name, or disk identifier
294
disk_id = self.get_identifier(disk)
295
if self.is_apfs(disk_id):
296
disk_id = self.get_physical_store(disk_id)
297
elif self.is_core_storage(disk_id):
298
disk_id = self.get_core_storage_pv(disk_id)
299
if not disk_id:
300
return None
301
if self.is_apfs(disk_id):
302
# We have apfs - let's get the container ref
303
for a in self.apfs.get("Containers", []):
304
# Check if it's the whole container
305
if a.get("ContainerReference", "").lower() == disk_id.lower():
306
return a["ContainerReference"]
307
# Check through each volume and return the parent's container ref
308
for v in a.get("Volumes", []):
309
if v.get("DeviceIdentifier", "").lower() == disk_id.lower():
310
return a.get("ContainerReference", None)
311
else:
312
# Not apfs - go through all volumes and whole disks
313
for d in self.disks.get("AllDisksAndPartitions", []):
314
if d.get("DeviceIdentifier", "").lower() == disk_id.lower():
315
return d["DeviceIdentifier"]
316
for p in d.get("Partitions", []):
317
if p.get("DeviceIdentifier", "").lower() == disk_id.lower():
318
return d["DeviceIdentifier"]
319
# Didn't find anything
320
return None
321
322
def get_efi(self, disk):
323
disk_id = self.get_parent(self.get_identifier(disk))
324
if not disk_id:
325
return None
326
# At this point - we should have the parent
327
for d in self.disks["AllDisksAndPartitions"]:
328
if d.get("DeviceIdentifier", "").lower() == disk_id.lower():
329
# Found our disk
330
for p in d.get("Partitions", []):
331
if p.get("Content", "").lower() == "efi":
332
return p.get("DeviceIdentifier", None)
333
return None
334
335
def mount_partition(self, disk):
336
disk_id = self.get_identifier(disk)
337
if not disk_id:
338
return None
339
sudo = False
340
if not self._compare_versions(self.full_os_version, self.sudo_mount_version) and self.get_content(disk_id).lower() in self.sudo_mount_types:
341
sudo = True
342
out = self.r.run({"args":[self.diskutil, "mount", disk_id], "sudo":sudo})
343
self._update_disks()
344
return out
345
346
def unmount_partition(self, disk):
347
disk_id = self.get_identifier(disk)
348
if not disk_id:
349
return None
350
out = self.r.run({"args":[self.diskutil, "unmount", disk_id]})
351
self._update_disks()
352
return out
353
354
def is_mounted(self, disk):
355
disk_id = self.get_identifier(disk)
356
if not disk_id:
357
return None
358
m = self.get_mount_point(disk_id)
359
return (m != None and len(m))
360
361
def get_volumes(self):
362
# Returns a list object with all volumes from disks
363
return self.disks.get("VolumesFromDisks", [])
364
365
def _get_value_apfs(self, disk, field, default = None):
366
return self._get_value(disk, field, default, True)
367
368
def _get_value(self, disk, field, default = None, apfs_only = False):
369
disk_id = self.get_identifier(disk)
370
if not disk_id:
371
return None
372
# Takes a disk identifier, and returns the requested value
373
for d in self.disks.get("AllDisksAndPartitions", []):
374
for a in d.get("APFSVolumes", []):
375
if a.get("DeviceIdentifier", "").lower() == disk_id.lower():
376
return a.get(field, default)
377
if apfs_only:
378
# Skip looking at regular partitions
379
continue
380
if d.get("DeviceIdentifier", "").lower() == disk_id.lower():
381
return d.get(field, default)
382
for a in d.get("Partitions", []):
383
if a.get("DeviceIdentifier", "").lower() == disk_id.lower():
384
return a.get(field, default)
385
return None
386
387
# Getter methods
388
def get_content(self, disk):
389
return self._get_value(disk, "Content")
390
391
def get_volume_name(self, disk):
392
return self._get_value(disk, "VolumeName")
393
394
def get_volume_uuid(self, disk):
395
return self._get_value(disk, "VolumeUUID")
396
397
def get_disk_uuid(self, disk):
398
return self._get_value(disk, "DiskUUID")
399
400
def get_mount_point(self, disk):
401
return self._get_value(disk, "MountPoint")
402
403
def open_mount_point(self, disk, new_window = False):
404
disk_id = self.get_identifier(disk)
405
if not disk_id:
406
return None
407
mount = self.get_mount_point(disk_id)
408
if not mount:
409
return None
410
out = self.r.run({"args":["open", mount]})
411
return out[2] == 0
412
413
def get_mounted_volumes(self):
414
# Returns a list of mounted volumes
415
vol_list = self.r.run({"args":["ls", "-1", "/Volumes"]})[0].split("\n")
416
vol_list = [ x for x in vol_list if x != "" ]
417
return vol_list
418
419
def get_mounted_volume_dicts(self):
420
# Returns a list of dicts of name, identifier, mount point dicts
421
vol_list = []
422
for v in self.get_mounted_volumes():
423
i = self.get_identifier(os.path.join("/Volumes", v))
424
if i == None:
425
i = self.get_identifier("/")
426
if not self.get_volume_name(i) == v:
427
# Not valid and not our boot drive
428
continue
429
vol_list.append({
430
"name" : self.get_volume_name(i),
431
"identifier" : i,
432
"mount_point" : self.get_mount_point(i),
433
"disk_uuid" : self.get_disk_uuid(i),
434
"volume_uuid" : self.get_volume_uuid(i)
435
})
436
return vol_list
437
438
def get_disks_and_partitions_dict(self):
439
# Returns a list of dictionaries like so:
440
# { "disk0" : { "partitions" : [
441
# {
442
# "identifier" : "disk0s1",
443
# "name" : "EFI",
444
# "mount_point" : "/Volumes/EFI"
445
# }
446
# ] } }
447
disks = {}
448
for d in self.disks.get("AllDisks", []):
449
# Get the parent and make sure it has an entry
450
parent = self.get_parent(d)
451
top_disk = self.get_top_identifier(d)
452
if top_disk == d and not self.is_core_storage(d):
453
# Top level, skip
454
continue
455
# Not top level - make sure it's not an apfs container or core storage container
456
if self.is_apfs_container(d):
457
continue
458
if self.is_cs_container(d):
459
continue
460
if not parent in disks:
461
disks[parent] = { "partitions" : [] }
462
disks[parent]["partitions"].append({
463
"name" : self.get_volume_name(d),
464
"identifier" : d,
465
"mount_point" : self.get_mount_point(d),
466
"disk_uuid" : self.get_disk_uuid(d),
467
"volume_uuid" : self.get_volume_uuid(d)
468
})
469
return disks
470
471