Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
S2-group
GitHub Repository: S2-group/android-runner
Path: blob/master/AndroidRunner/pyand/ADB.py
630 views
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
4
try:
5
import sys
6
import subprocess
7
import re
8
import platform
9
import shlex
10
from os import popen as pipe
11
except ImportError as e:
12
print("[!] Required module missing. %s" % e.args[0])
13
sys.exit(-1)
14
15
16
# noinspection PyUnusedLocal
17
class ADB(object):
18
__adb_path = None
19
__output = None
20
__error = None
21
__devices = None
22
__target = None
23
24
# reboot modes
25
REBOOT_NORMAL = 0
26
REBOOT_RECOVERY = 1
27
REBOOT_BOOTLOADER = 2
28
29
# default TCP/IP port
30
DEFAULT_TCP_PORT = 5555
31
# default TCP/IP host
32
DEFAULT_TCP_HOST = "localhost"
33
34
def __init__(self, adb_path="adb"):
35
# By default we assume adb is in $PATH
36
self.__adb_path = adb_path
37
if not self.check_path():
38
self.__error = "[!] adb path not valid"
39
40
def __clean__(self):
41
self.__output = None
42
self.__error = None
43
44
def __read_output__(self, fd):
45
ret = ''
46
while 1:
47
line = fd.readline()
48
if not line:
49
break
50
ret += line
51
52
if len(ret) == 0:
53
ret = None
54
55
return ret
56
57
def __build_command__(self, cmd):
58
"""
59
Build command parameters
60
"""
61
if self.__devices is not None and len(self.__devices) > 1 and self.__target is None:
62
self.__error = "[!] Must set target device first"
63
return None
64
65
if type(cmd) is tuple:
66
a = list(cmd)
67
elif type(cmd) is list:
68
a = cmd
69
else:
70
# All arguments must be single list items
71
a = shlex.split(cmd)
72
73
a.insert(0, self.__adb_path)
74
if self.__target is not None:
75
# add target device arguments to the command
76
a.insert(1, '-s')
77
a.insert(2, self.__target)
78
79
return a
80
81
def run_cmd(self, cmd):
82
"""
83
Run a command against the adb tool ($ adb <cmd>)
84
"""
85
self.__clean__()
86
87
if self.__adb_path is None:
88
self.__error = "[!] ADB path not set"
89
return False
90
91
try:
92
args = self.__build_command__(cmd)
93
if args is None:
94
return
95
cmdp = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
96
self.__output, self.__error = cmdp.communicate()
97
retcode = cmdp.wait()
98
if "device unauthorized" in self.__output.decode('utf-8'):
99
self.__error = "[-] Device unauthorized"
100
return False
101
return self.__output.decode('utf-8').rstrip('\n')
102
except OSError as error:
103
self.__error = str(error)
104
105
return
106
107
def get_version(self):
108
"""
109
Returns ADB tool version
110
adb version
111
"""
112
ret = self.run_cmd("version")
113
try:
114
pattern = re.compile(r"version\s(.+)")
115
version = pattern.findall(ret)[0]
116
except:
117
version = None
118
return version
119
120
def check_path(self):
121
"""
122
Verify if adb path is valid
123
"""
124
125
if self.get_version() is None:
126
print("[-] adb executable not found")
127
return False
128
return True
129
130
def set_adb_path(self, adb_path):
131
"""
132
Set the ADB tool path
133
"""
134
self.__adb_path = adb_path
135
self.check_path()
136
137
def get_adb_path(self):
138
"""
139
Returns the ADB tool path
140
"""
141
return self.__adb_path
142
143
def start_server(self):
144
"""
145
Starts the ADB server
146
adb start-server
147
"""
148
self.run_cmd('start-server')
149
return self.__output
150
151
def kill_server(self):
152
"""
153
Kills the ADB server
154
adb kill-server
155
"""
156
self.run_cmd('kill-server')
157
158
def restart_server(self):
159
"""
160
Restarts the ADB server
161
"""
162
self.kill_server()
163
return self.start_server()
164
165
def restore_file(self, file_name):
166
"""
167
Restore device contents from the <file> backup archive
168
adb restore <file>
169
"""
170
self.run_cmd(['restore', file_name])
171
return self.__output
172
173
def wait_for_device(self):
174
"""
175
Block operations until device is online
176
adb wait-for-device
177
"""
178
self.run_cmd('wait-for-device')
179
return self.__output
180
181
def get_help(self):
182
"""
183
Returns ADB help
184
adb help
185
"""
186
self.run_cmd('help')
187
return self.__output
188
189
def get_devices(self):
190
"""
191
Return a dictionary of connected devices along with an incremented Id.
192
adb devices
193
"""
194
error = 0
195
# Clear existing list of devices
196
self.__devices = None
197
self.run_cmd("devices")
198
device_dict = {}
199
if self.__error is not None:
200
return None
201
try:
202
n = 0
203
output_list = self.__output.decode('utf-8').split("\n")
204
# Split on \r if we are on Windows
205
if platform.system().lower == "windows":
206
output_list = self.__output.decode('utf-8').split("\r")
207
208
for line in output_list:
209
pattern = re.compile(r"([^\s]+)\t+.+$")
210
device = pattern.findall(line)
211
if device:
212
device_dict[n] = device[0]
213
n += 1
214
except:
215
self.__devices = None
216
error = 1
217
self.__devices = device_dict
218
return self.__devices
219
220
def set_target_by_name(self, device):
221
"""
222
Specify the device name to target
223
example: set_target_device('emulator-5554')
224
"""
225
if device is None or self.__devices is None or device not in list(self.__devices.values()):
226
self.__error = 'Must get device list first'
227
print("[!] Device not found in device list")
228
return False
229
self.__target = device
230
return "[+] Target device set: %s" % self.get_target_device()
231
232
def set_target_by_id(self, device):
233
"""
234
Specify the device ID to target.
235
The ID should be one from the device list.
236
"""
237
if device is None or self.__devices is None or device not in self.__devices:
238
self.__error = 'Must get device list first'
239
print("[!] Device not found in device list")
240
return False
241
self.__target = self.__devices[device]
242
return "[+] Target device set: %s" % self.get_target_device()
243
244
def get_target_device(self):
245
"""
246
Returns the selected device to work with
247
"""
248
if self.__target is None:
249
print("[*] No device target set")
250
251
return self.__target
252
253
def get_state(self):
254
"""
255
Get ADB state. Returns either offline | offline | device
256
adb get-state
257
"""
258
return self.run_cmd('get-state')
259
260
def get_model(self):
261
"""
262
Get Model name from target device
263
"""
264
self.run_cmd("devices -l")
265
device_model = ""
266
if self.__error is not None:
267
return self.__error
268
try:
269
for line in self.__output.decode('utf-8').split("\n"):
270
if line.startswith(self.__target):
271
pattern = r"model:(.+)\sdevice"
272
pat = re.compile(pattern)
273
device_model = pat.findall(line)
274
device_model = re.sub(r"[\[\]\'{\}<>]", '', str(device_model))
275
except Exception as error:
276
return "[-] Error: %s" % error.args[0]
277
278
return device_model
279
280
def get_serialno(self):
281
"""
282
Get serialno from target device
283
adb get-serialno
284
"""
285
return self.run_cmd('get-serialno')
286
287
def reboot_device(self, mode=0):
288
"""
289
Reboot the target device
290
Specify mode to reboot normally, recovery or bootloader
291
adb reboot <normally (0)/recovery (1) /bootloader (2)>
292
"""
293
if mode not in (self.REBOOT_NORMAL, self.REBOOT_RECOVERY, self.REBOOT_BOOTLOADER):
294
self.__error = "mode must be REBOOT_NORMAL/REBOOT_RECOVERY/REBOOT_BOOTLOADER"
295
return self.__output
296
297
cmd_str = "reboot"
298
if mode == self.REBOOT_RECOVERY:
299
cmd_str += " recovery"
300
elif mode == self.REBOOT_BOOTLOADER:
301
cmd_str += " bootloader"
302
303
return self.run_cmd(cmd_str)
304
305
def set_adb_root(self, mode):
306
"""
307
restarts the adbd daemon with root permissions
308
adb root
309
"""
310
return self.run_cmd('root')
311
312
def set_system_rw(self):
313
"""
314
Mounts /system as rw
315
adb remount
316
"""
317
self.run_cmd("remount")
318
return self.__output
319
320
def get_remote_file(self, remote, local):
321
"""
322
Pulls a remote file
323
adb pull remote local
324
"""
325
self.run_cmd(['pull', remote, local])
326
if "bytes in" in self.__error:
327
self.__output = self.__error
328
self.__error = None
329
return self.__output
330
331
def push_local_file(self, local, remote):
332
"""
333
Push a local file
334
adb push local remote
335
"""
336
self.run_cmd(['push', local, remote])
337
return self.__output
338
339
def shell_command(self, cmd):
340
"""
341
Executes a shell command
342
adb shell <cmd>
343
"""
344
self.run_cmd(['shell', cmd])
345
return self.__output
346
347
def listen_usb(self):
348
"""
349
Restarts the adbd daemon listening on USB
350
adb usb
351
"""
352
self.run_cmd("usb")
353
return self.__output
354
355
def listen_tcp(self, port=DEFAULT_TCP_PORT):
356
"""
357
Restarts the adbd daemon listening on the specified port
358
adb tcpip <port>
359
"""
360
self.run_cmd(['tcpip', str(port)])
361
return self.__output
362
363
def get_bugreport(self):
364
"""
365
Return all information from the device that should be included in a bug report
366
adb bugreport
367
"""
368
self.run_cmd("bugreport")
369
return self.__output
370
371
def get_jdwp(self):
372
"""
373
List PIDs of processes hosting a JDWP transport
374
adb jdwp
375
"""
376
return self.run_cmd("jdwp")
377
378
def get_logcat(self, lcfilter=""):
379
"""
380
View device log
381
adb logcat <filter>
382
"""
383
self.run_cmd(['logcat', lcfilter])
384
return self.__output
385
386
def run_emulator(self, cmd=""):
387
"""
388
Run emulator console command
389
"""
390
self.run_cmd(['emu', cmd])
391
return self.__output
392
393
def connect_remote(self, host=DEFAULT_TCP_HOST, port=DEFAULT_TCP_PORT):
394
"""
395
Connect to a device via TCP/IP
396
adb connect host:port
397
"""
398
self.run_cmd("connect %s:%s" % (host, port))
399
return self.__output
400
401
def disconnect_remote(self, host=DEFAULT_TCP_HOST, port=DEFAULT_TCP_PORT):
402
"""
403
Disconnect from a TCP/IP device
404
adb disconnect host:port
405
"""
406
self.run_cmd("disconnect %s:%s" % (host, port))
407
return self.__output
408
409
def ppp_over_usb(self, tty=None, params=""):
410
"""
411
Run PPP over USB
412
adb ppp <tty> <params>
413
"""
414
if tty is None:
415
return self.__output
416
417
cmd = "ppp %s" % tty
418
if params != "":
419
cmd += " %s" % params
420
421
self.run_cmd(cmd)
422
return self.__output
423
424
def sync_directory(self, directory=""):
425
"""
426
Copy host->device only if changed (-l means list but don't copy)
427
adb sync <dir>
428
"""
429
self.run_cmd(['sync', directory])
430
return self.__output
431
432
def forward_socket(self, local=None, remote=None):
433
"""
434
Forward socket connections
435
adb forward <local> <remote>
436
"""
437
if local is None or remote is None:
438
return self.__output
439
self.run_cmd("forward %s %s" % (local, remote))
440
return self.__output
441
442
def uninstall(self, package=None, keepdata=False):
443
"""
444
Remove this app package from the device
445
adb uninstall [-k] package
446
"""
447
if package is None:
448
return self.__output
449
cmd = ['uninstall']
450
if not keepdata:
451
cmd += ['-k']
452
cmd += [package]
453
self.run_cmd(cmd)
454
return self.__output
455
456
def install(self, pkgapp=None, fwdlock=False, reinstall=False, sdcard=False):
457
"""
458
Push this package file to the device and install it
459
adb install [-l] [-r] [-s] <file>
460
-l -> forward-lock the app
461
-r -> reinstall the app, keeping its data
462
-s -> install on sdcard instead of internal storage
463
"""
464
465
if pkgapp is None:
466
return self.__output
467
468
cmd = ["install"]
469
if fwdlock is True:
470
cmd += ["-l"]
471
if reinstall is True:
472
cmd += ["-r"]
473
if sdcard is True:
474
cmd += ["-s"]
475
cmd += [pkgapp]
476
477
self.run_cmd(cmd)
478
return self.__output
479
480
def find_binary(self, name=None):
481
"""
482
Look for a binary file on the device
483
"""
484
485
self.shell_command("which %s" % name)
486
487
if self.__output is None: # not found
488
self.__error = "'%s' was not found" % name
489
elif self.__output.strip() == "which: not found": # which binary not available
490
self.__output = None
491
self.__error = "which binary not found"
492
else:
493
self.__output = self.__output.strip()
494
495
return self.__output
496
497
def wake_device(self):
498
return self.run_cmd('shell input keyevent 26')
499
500
def sideload(self, otapackage=None):
501
if otapackage is None:
502
return self.__output
503
self.run_cmd(["sideload", otapackage])
504
return self.__output
505
506
def get_devpath(self):
507
return self.run_cmd('get-devpath')
508
509