Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
datalux
GitHub Repository: datalux/osintgram
Path: blob/master/src/hikercli.py
271 views
1
import datetime
2
import json
3
import sys
4
import urllib
5
import codecs
6
from pathlib import Path
7
8
import ssl
9
10
ssl._create_default_https_context = ssl._create_unverified_context
11
12
from geopy.geocoders import Nominatim
13
from hikerapi import Client as AppClient
14
from hikerapi import __version__ as hk
15
16
from prettytable import PrettyTable
17
18
from src import printcolors as pc
19
from src import config
20
21
22
class HikerCLI:
23
api = None
24
api2 = None
25
geolocator = Nominatim(user_agent="http")
26
user_id = None
27
target_id = None
28
is_private = True
29
following = False
30
target = ""
31
writeFile = False
32
jsonDump = False
33
cli_mode = False
34
output_dir = "output"
35
36
def __init__(self, target, is_file, is_json, is_cli, output_dir, clear_cookies):
37
self.output_dir = output_dir or self.output_dir
38
access_key = config.getHikerToken()
39
self.cli_mode = is_cli
40
if not is_cli:
41
print("\nConnect to HikerAPI...")
42
self.api = AppClient(token=access_key)
43
self.setTarget(target)
44
self.writeFile = is_file
45
self.jsonDump = is_json
46
47
def setTarget(self, target):
48
self.target = target
49
self.user = self.get_user(target)
50
self.target_id = self.user["pk"]
51
self.is_private = self.user["is_private"]
52
self.__printTargetBanner__()
53
self.output_dir = self.output_dir + "/" + str(self.target)
54
Path(self.output_dir).mkdir(parents=True, exist_ok=True)
55
56
def __get_feed__(self, limit=-1):
57
data = []
58
next_page_id = ""
59
while True:
60
pc.printout("@", pc.CYAN)
61
result = self.api.user_medias_v2(self.target_id, page_id=next_page_id)
62
data.extend(result.get("response", {}).get("items", []))
63
next_page_id = result.get("next_page_id")
64
if limit > -1 and len(data) >= limit:
65
break
66
if not next_page_id:
67
break
68
return data
69
70
def __get_comments__(self, media_id, limit=-1):
71
data = []
72
next_page_id = ""
73
while True:
74
pc.printout("@", pc.CYAN)
75
try:
76
result = self.api.media_comments_v2(media_id, page_id=next_page_id)
77
except Exception as e:
78
msg = str(e)
79
pc.printout(msg, pc.RED)
80
if "Entries not found" in msg:
81
return data
82
raise e
83
data.extend(result.get("response", {}).get("comments", []))
84
next_page_id = result.get("next_page_id")
85
if limit > -1 and len(data) >= limit:
86
break
87
if not next_page_id:
88
break
89
return data
90
91
def __printTargetBanner__(self):
92
pc.printout("Target: ", pc.GREEN)
93
pc.printout(str(self.target), pc.CYAN)
94
pc.printout(" [" + str(self.target_id) + "]")
95
if self.is_private:
96
pc.printout(" [PRIVATE PROFILE]", pc.BLUE)
97
print("\n")
98
99
def change_target(self):
100
pc.printout("Insert new target username: ", pc.YELLOW)
101
line = input()
102
self.setTarget(line)
103
return
104
105
def get_addrs(self):
106
if self.check_private_profile():
107
return
108
109
pc.printout("Searching for target localizations...\n")
110
111
data = self.__get_feed__()
112
113
locations = {}
114
115
for post in data:
116
if "location" in post and post["location"] is not None:
117
if "lat" in post["location"] and "lng" in post["location"]:
118
lat = post["location"]["lat"]
119
lng = post["location"]["lng"]
120
locations[str(lat) + ", " + str(lng)] = post.get("taken_at")
121
122
address = {}
123
for k, v in locations.items():
124
details = self.geolocator.reverse(k)
125
unix_timestamp = datetime.datetime.fromtimestamp(v)
126
address[details.address] = unix_timestamp.strftime("%Y-%m-%d %H:%M:%S")
127
128
sort_addresses = sorted(address.items(), key=lambda p: p[1], reverse=True)
129
130
if len(sort_addresses) > 0:
131
t = PrettyTable()
132
133
t.field_names = ["Post", "Address", "Time"]
134
t.align["Post"] = "l"
135
t.align["Address"] = "l"
136
t.align["Time"] = "l"
137
pc.printout(
138
"\nWoohoo! We found " + str(len(sort_addresses)) + " addresses\n",
139
pc.GREEN,
140
)
141
142
i = 1
143
144
json_data = {}
145
addrs_list = []
146
147
for address, time in sort_addresses:
148
t.add_row([str(i), address, time])
149
150
if self.jsonDump:
151
addr = {"address": address, "time": time}
152
addrs_list.append(addr)
153
154
i = i + 1
155
156
if self.writeFile:
157
file_name = self.output_dir + "/" + self.target + "_addrs.txt"
158
file = open(file_name, "w")
159
file.write(str(t))
160
file.close()
161
162
if self.jsonDump:
163
json_data["address"] = addrs_list
164
json_file_name = self.output_dir + "/" + self.target + "_addrs.json"
165
with open(json_file_name, "w") as f:
166
json.dump(json_data, f)
167
168
print(t)
169
else:
170
pc.printout("Sorry! No results found :-(\n", pc.RED)
171
172
def get_captions(self):
173
if self.check_private_profile():
174
return
175
176
pc.printout("Searching for target captions...\n")
177
178
captions = []
179
180
data = self.__get_feed__()
181
counter = 0
182
183
try:
184
for item in data:
185
if "caption" in item:
186
if item["caption"] is not None:
187
text = item["caption"]["text"]
188
captions.append(text)
189
counter = counter + 1
190
sys.stdout.write("\rFound %i" % counter)
191
sys.stdout.flush()
192
193
except AttributeError:
194
pass
195
196
except KeyError:
197
pass
198
199
json_data = {}
200
201
if counter > 0:
202
pc.printout("\nWoohoo! We found " + str(counter) + " captions\n", pc.GREEN)
203
204
file = None
205
206
if self.writeFile:
207
file_name = self.output_dir + "/" + self.target + "_captions.txt"
208
file = open(file_name, "w")
209
210
for s in captions:
211
print(s + "\n")
212
213
if self.writeFile:
214
file.write(s + "\n")
215
216
if self.jsonDump:
217
json_data["captions"] = captions
218
json_file_name = (
219
self.output_dir + "/" + self.target + "_followings.json"
220
)
221
with open(json_file_name, "w") as f:
222
json.dump(json_data, f)
223
224
if file is not None:
225
file.close()
226
227
else:
228
pc.printout("Sorry! No results found :-(\n", pc.RED)
229
230
return
231
232
def get_total_comments(self):
233
if self.check_private_profile():
234
return
235
236
pc.printout("Searching for target total comments...\n")
237
238
data = self.__get_feed__()
239
240
comments = [int(p["comment_count"]) for p in data]
241
posts = len(comments)
242
comments_counter = sum(comments)
243
min_ = min(comments)
244
max_ = max(comments)
245
avg = int(comments_counter / posts)
246
result = f" comments in {posts} posts (min: {min_}, max: {max_}, avg: {avg})\n"
247
248
if self.writeFile:
249
file_name = self.output_dir + "/" + self.target + "_comments.txt"
250
file = open(file_name, "w")
251
file.write(
252
str(comments_counter) + " comments in " + str(posts) + " posts\n"
253
)
254
file.close()
255
256
if self.jsonDump:
257
json_data = {"comment_counter": comments_counter, "posts": posts}
258
json_file_name = self.output_dir + "/" + self.target + "_comments.json"
259
with open(json_file_name, "w") as f:
260
json.dump(json_data, f)
261
262
pc.printout(str(comments_counter), pc.MAGENTA)
263
pc.printout(result)
264
265
def get_comment_data(self):
266
if self.check_private_profile():
267
return
268
269
pc.printout("Retrieving all comments, this may take a moment...\n")
270
data = self.__get_feed__()
271
272
_comments = []
273
t = PrettyTable(["POST ID", "ID", "Username", "Comment"])
274
t.align["POST ID"] = "l"
275
t.align["ID"] = "l"
276
t.align["Username"] = "l"
277
t.align["Comment"] = "l"
278
279
for post in data:
280
post_id = post.get("id")
281
comments = self.__get_comments__(post_id)
282
for comment in comments:
283
comment = {
284
"post_id": post_id,
285
"user_id": comment.get("user_id"),
286
"username": comment.get("user", {}).get("username"),
287
"comment": comment.get("text"),
288
}
289
t.add_row(
290
[
291
post_id,
292
comment["user_id"],
293
comment["username"],
294
comment["comment"],
295
]
296
)
297
_comments.append(comment)
298
299
print(t)
300
if self.writeFile:
301
file_name = self.output_dir + "/" + self.target + "_comment_data.txt"
302
with open(file_name, "w") as f:
303
f.write(str(t))
304
f.close()
305
306
if self.jsonDump:
307
file_name_json = self.output_dir + "/" + self.target + "_comment_data.json"
308
with open(file_name_json, "w") as f:
309
f.write('{ "Comments":[ \n')
310
f.write("\n".join(json.dumps(comment) for comment in _comments) + ",\n")
311
f.write("]} ")
312
313
def get_followers(self):
314
if self.check_private_profile():
315
return
316
317
pc.printout("Searching for target followers...\n")
318
319
followers = []
320
next_page_id = ""
321
while True:
322
pc.printout("@", pc.CYAN)
323
result = self.api.user_followers_v2(self.target_id, page_id=next_page_id)
324
followers.extend(result.get("response", {}).get("users", []))
325
next_page_id = result.get("next_page_id")
326
if not next_page_id:
327
break
328
329
print("\n")
330
331
t = PrettyTable(["ID", "Username", "Full Name"])
332
t.align["ID"] = "l"
333
t.align["Username"] = "l"
334
t.align["Full Name"] = "l"
335
336
items = []
337
for user in followers:
338
u = {
339
"id": user["pk"],
340
"username": user["username"],
341
"full_name": user["full_name"],
342
"profile_pic_url": user["profile_pic_url"],
343
"is_private": user["is_private"],
344
"is_verified": user["is_verified"],
345
}
346
t.add_row([str(u["id"]), u["username"], u["full_name"]])
347
348
if self.jsonDump:
349
items.append(u)
350
351
if self.writeFile:
352
file_name = self.output_dir + "/" + self.target + "_followers.txt"
353
file = open(file_name, "w")
354
file.write(str(t))
355
file.close()
356
357
if self.jsonDump:
358
json_data = {"followers": items}
359
json_file_name = self.output_dir + "/" + self.target + "_followers.json"
360
with open(json_file_name, "w") as f:
361
json.dump(json_data, f)
362
363
print(t)
364
365
def get_followings(self):
366
if self.check_private_profile():
367
return
368
369
pc.printout("Searching for target followings...\n")
370
371
following = []
372
next_page_id = ""
373
while True:
374
pc.printout("@", pc.CYAN)
375
result = self.api.user_following_v2(self.target_id, page_id=next_page_id)
376
following.extend(result.get("response", {}).get("users", []))
377
next_page_id = result.get("next_page_id")
378
if not next_page_id:
379
break
380
381
print("\n")
382
383
t = PrettyTable(["ID", "Username", "Full Name"])
384
t.align["ID"] = "l"
385
t.align["Username"] = "l"
386
t.align["Full Name"] = "l"
387
388
items = []
389
for user in following:
390
u = {
391
"id": user["pk"],
392
"username": user["username"],
393
"full_name": user["full_name"],
394
"profile_pic_url": user["profile_pic_url"],
395
"is_private": user["is_private"],
396
"is_verified": user["is_verified"],
397
}
398
t.add_row([str(u["id"]), u["username"], u["full_name"]])
399
400
if self.jsonDump:
401
items.append(u)
402
403
if self.writeFile:
404
file_name = self.output_dir + "/" + self.target + "_followings.txt"
405
file = open(file_name, "w")
406
file.write(str(t))
407
file.close()
408
409
if self.jsonDump:
410
json_data = {"followings": items}
411
json_file_name = self.output_dir + "/" + self.target + "_followings.json"
412
with open(json_file_name, "w") as f:
413
json.dump(json_data, f)
414
415
print(t)
416
417
def get_hashtags(self):
418
if self.check_private_profile():
419
return
420
421
pc.printout("Searching for target hashtags...\n")
422
423
data = self.__get_feed__()
424
425
hashtags = []
426
for post in data:
427
if post["caption"] is not None:
428
caption = post["caption"]["text"]
429
for s in caption.split():
430
if s.startswith("#"):
431
hashtags.append(s)
432
433
if len(hashtags) > 0:
434
hashtag_counter = {}
435
436
for i in hashtags:
437
if i in hashtag_counter:
438
hashtag_counter[i] += 1
439
else:
440
hashtag_counter[i] = 1
441
442
ssort = sorted(
443
hashtag_counter.items(), key=lambda value: value[1], reverse=True
444
)
445
446
file = None
447
hashtags_list = []
448
449
if self.writeFile:
450
file_name = self.output_dir + "/" + self.target + "_hashtags.txt"
451
file = open(file_name, "w")
452
453
print()
454
for hashtag, v in ssort:
455
line = f"{v}. {hashtag}"
456
print(line)
457
if self.writeFile:
458
file.write(f"{line}\n")
459
if self.jsonDump:
460
hashtags_list.append(hashtag)
461
462
if file is not None:
463
file.close()
464
465
if self.jsonDump:
466
json_data = {"hashtags": hashtags_list}
467
json_file_name = self.output_dir + "/" + self.target + "_hashtags.json"
468
with open(json_file_name, "w") as f:
469
json.dump(json_data, f)
470
else:
471
pc.printout("Sorry! No results found :-(\n", pc.RED)
472
473
def get_user_info(self):
474
data = self.user
475
pc.printout("[ID] ", pc.GREEN)
476
pc.printout(str(data["pk"]) + "\n")
477
pc.printout("[FULL NAME] ", pc.RED)
478
pc.printout(str(data["full_name"]) + "\n")
479
pc.printout("[BIOGRAPHY] ", pc.CYAN)
480
pc.printout(str(data["biography"]) + "\n")
481
pc.printout("[FOLLOWED] ", pc.BLUE)
482
pc.printout(str(data["follower_count"]) + "\n")
483
pc.printout("[FOLLOW] ", pc.GREEN)
484
pc.printout(str(data["following_count"]) + "\n")
485
pc.printout("[MEDIA] ", pc.CYAN)
486
pc.printout(str(data["media_count"]) + "\n")
487
pc.printout("[BUSINESS ACCOUNT] ", pc.RED)
488
pc.printout(str(data["is_business"]) + "\n")
489
if data["is_business"]:
490
if not data["can_hide_category"]:
491
pc.printout("[BUSINESS CATEGORY] ")
492
pc.printout(str(data["category"]) + "\n")
493
pc.printout("[VERIFIED ACCOUNT] ", pc.CYAN)
494
pc.printout(str(data["is_verified"]) + "\n")
495
if "public_email" in data and data["public_email"]:
496
pc.printout("[EMAIL] ", pc.BLUE)
497
pc.printout(str(data["public_email"]) + "\n")
498
pc.printout("[HD PROFILE PIC] ", pc.GREEN)
499
pc.printout(str(data["hd_profile_pic_url_info"]["url"]) + "\n")
500
if "fb_page_call_to_action_id" in data and data["fb_page_call_to_action_id"]:
501
pc.printout("[FB PAGE] ", pc.RED)
502
pc.printout(str(data["connected_fb_page"]) + "\n")
503
if "whatsapp_number" in data and data["whatsapp_number"]:
504
pc.printout("[WHATSAPP NUMBER] ", pc.GREEN)
505
pc.printout(str(data["whatsapp_number"]) + "\n")
506
if "city_name" in data and data["city_name"]:
507
pc.printout("[CITY] ", pc.YELLOW)
508
pc.printout(str(data["city_name"]) + "\n")
509
if "address_street" in data and data["address_street"]:
510
pc.printout("[ADDRESS STREET] ", pc.RED)
511
pc.printout(str(data["address_street"]) + "\n")
512
if "contact_phone_number" in data and data["contact_phone_number"]:
513
pc.printout("[CONTACT PHONE NUMBER] ", pc.CYAN)
514
pc.printout(str(data["contact_phone_number"]) + "\n")
515
516
if self.jsonDump:
517
user = {
518
"id": data["pk"],
519
"full_name": data["full_name"],
520
"biography": data["biography"],
521
"edge_followed_by": data["follower_count"],
522
"edge_follow": data["following_count"],
523
"is_business_account": data["is_business"],
524
"is_verified": data["is_verified"],
525
"profile_pic_url_hd": data["hd_profile_pic_url_info"]["url"],
526
}
527
if "public_email" in data and data["public_email"]:
528
user["email"] = data["public_email"]
529
if (
530
"fb_page_call_to_action_id" in data
531
and data["fb_page_call_to_action_id"]
532
):
533
user["connected_fb_page"] = data["fb_page_call_to_action_id"]
534
if "whatsapp_number" in data and data["whatsapp_number"]:
535
user["whatsapp_number"] = data["whatsapp_number"]
536
if "city_name" in data and data["city_name"]:
537
user["city_name"] = data["city_name"]
538
if "address_street" in data and data["address_street"]:
539
user["address_street"] = data["address_street"]
540
if "contact_phone_number" in data and data["contact_phone_number"]:
541
user["contact_phone_number"] = data["contact_phone_number"]
542
543
json_file_name = self.output_dir + "/" + self.target + "_info.json"
544
with open(json_file_name, "w") as f:
545
json.dump(user, f)
546
547
def get_total_likes(self):
548
if self.check_private_profile():
549
return
550
551
pc.printout("Searching for target total likes...\n")
552
553
data = self.__get_feed__()
554
555
likes = [int(p["like_count"]) for p in data]
556
posts = len(likes)
557
like_counter = sum(likes)
558
min_ = min(likes)
559
max_ = max(likes)
560
avg = int(like_counter / posts)
561
result = f" likes in {posts} posts (min: {min_}, max: {max_}, avg: {avg})\n"
562
563
if self.writeFile:
564
file_name = self.output_dir + "/" + self.target + "_likes.txt"
565
file = open(file_name, "w")
566
file.write(str(like_counter) + result)
567
file.close()
568
569
if self.jsonDump:
570
json_data = {
571
"like_counter": like_counter,
572
"posts": posts,
573
"min": min_,
574
"max": max_,
575
"avg": avg,
576
}
577
json_file_name = self.output_dir + "/" + self.target + "_likes.json"
578
with open(json_file_name, "w") as f:
579
json.dump(json_data, f)
580
581
pc.printout(f"\n{like_counter}", pc.MAGENTA)
582
pc.printout(result)
583
584
def get_media_type(self):
585
if self.check_private_profile():
586
return
587
588
pc.printout("Searching for target captions...\n")
589
590
counter = 0
591
photo_counter = 0
592
video_counter = 0
593
594
data = self.__get_feed__()
595
596
for post in data:
597
if "media_type" in post:
598
if post["media_type"] == 1:
599
photo_counter = photo_counter + 1
600
elif post["media_type"] == 2:
601
video_counter = video_counter + 1
602
counter = counter + 1
603
sys.stdout.write("\rChecked %i" % counter)
604
sys.stdout.flush()
605
606
sys.stdout.write(" posts")
607
sys.stdout.flush()
608
609
if counter > 0:
610
611
if self.writeFile:
612
file_name = self.output_dir + "/" + self.target + "_mediatype.txt"
613
file = open(file_name, "w")
614
file.write(
615
str(photo_counter)
616
+ " photos and "
617
+ str(video_counter)
618
+ " video posted by target\n"
619
)
620
file.close()
621
622
pc.printout(
623
"\nWoohoo! We found "
624
+ str(photo_counter)
625
+ " photos and "
626
+ str(video_counter)
627
+ " video posted by target\n",
628
pc.GREEN,
629
)
630
631
if self.jsonDump:
632
json_data = {"photos": photo_counter, "videos": video_counter}
633
json_file_name = self.output_dir + "/" + self.target + "_mediatype.json"
634
with open(json_file_name, "w") as f:
635
json.dump(json_data, f)
636
637
else:
638
pc.printout("Sorry! No results found :-(\n", pc.RED)
639
640
def get_people_who_commented(self):
641
if self.check_private_profile():
642
return
643
644
pc.printout("Searching for users who commented...\n")
645
646
data = self.__get_feed__()
647
users = []
648
649
for post in data:
650
comments = self.__get_comments__(post["id"])
651
for comment in comments:
652
if not any(u["id"] == comment["user"]["pk"] for u in users):
653
user = {
654
"id": comment["user"]["pk"],
655
"username": comment["user"]["username"],
656
"full_name": comment["user"]["full_name"],
657
"counter": 1,
658
}
659
users.append(user)
660
else:
661
for user in users:
662
if user["id"] == comment["user"]["pk"]:
663
user["counter"] += 1
664
break
665
666
if len(users) > 0:
667
ssort = sorted(users, key=lambda value: value["counter"], reverse=True)
668
669
json_data = {}
670
671
t = PrettyTable()
672
673
t.field_names = ["Comments", "ID", "Username", "Full Name"]
674
t.align["Comments"] = "l"
675
t.align["ID"] = "l"
676
t.align["Username"] = "l"
677
t.align["Full Name"] = "l"
678
679
for u in ssort:
680
t.add_row([str(u["counter"]), u["id"], u["username"], u["full_name"]])
681
682
print(t)
683
684
if self.writeFile:
685
file_name = (
686
self.output_dir + "/" + self.target + "_users_who_commented.txt"
687
)
688
file = open(file_name, "w")
689
file.write(str(t))
690
file.close()
691
692
if self.jsonDump:
693
json_data["users_who_commented"] = ssort
694
json_file_name = (
695
self.output_dir + "/" + self.target + "_users_who_commented.json"
696
)
697
with open(json_file_name, "w") as f:
698
json.dump(json_data, f)
699
else:
700
pc.printout("Sorry! No results found :-(\n", pc.RED)
701
702
def get_people_who_tagged(self):
703
if self.check_private_profile():
704
return
705
706
pc.printout("Searching for users who tagged target...\n")
707
708
posts = []
709
next_page_id = ""
710
while True:
711
pc.printout("@", pc.CYAN)
712
resp = self.api.user_tag_medias_v2(self.target_id, page_id=next_page_id)
713
posts.extend(resp.get("response", {}).get("items", []))
714
next_page_id = resp.get("next_page_id")
715
if not next_page_id:
716
break
717
718
if len(posts) > 0:
719
pc.printout(f"\nWoohoo! We found {len(posts)} medias\n", pc.GREEN)
720
users = {}
721
722
for post in posts:
723
tag = post["user"]
724
pk = tag["pk"]
725
if pk in users:
726
users[pk]["counter"] += 1
727
continue
728
users[pk] = {
729
"id": pk,
730
"username": tag["username"],
731
"full_name": tag["full_name"],
732
"counter": 1,
733
}
734
735
users = users.values()
736
ssort = sorted(users, key=lambda value: value["counter"], reverse=True)
737
t = PrettyTable()
738
t.field_names = ["Medias", "ID", "Username", "Full Name"]
739
t.align["Medias"] = "l"
740
t.align["ID"] = "l"
741
t.align["Username"] = "l"
742
t.align["Full Name"] = "l"
743
744
for u in ssort:
745
t.add_row([str(u["counter"]), u["id"], u["username"], u["full_name"]])
746
747
print(t)
748
749
if self.writeFile:
750
file_name = (
751
self.output_dir + "/" + self.target + "_users_who_tagged.txt"
752
)
753
file = open(file_name, "w")
754
file.write(str(t))
755
file.close()
756
757
if self.jsonDump:
758
json_data = {"users_who_tagged": ssort}
759
json_file_name = (
760
self.output_dir + "/" + self.target + "_users_who_tagged.json"
761
)
762
with open(json_file_name, "w") as f:
763
json.dump(json_data, f)
764
else:
765
pc.printout("\nSorry! No results found :-(\n", pc.RED)
766
767
def get_photo_description(self):
768
if self.check_private_profile():
769
return
770
pc.printout("Instagram has disabled this functionality.\n", pc.RED)
771
772
def get_user_photo(self):
773
if self.check_private_profile():
774
return
775
776
limit = -1
777
pc.printout("How many photos you want to download (default all): ", pc.YELLOW)
778
user_input = input()
779
780
try:
781
if user_input.lower() in ("", "all"):
782
pc.printout("Downloading all photos available...\n")
783
else:
784
limit = int(user_input)
785
pc.printout(f"Downloading {user_input} photos...\n")
786
except ValueError:
787
pc.printout("Wrong value entered\n", pc.RED)
788
return
789
790
data = self.__get_feed__(limit=limit)
791
print()
792
counter = 0
793
for item in data:
794
if "image_versions2" in item:
795
if limit > -1 and counter >= limit:
796
break
797
counter += 1
798
url = item["image_versions2"]["candidates"][0]["url"]
799
photo_id = item["id"]
800
end = self.output_dir + "/" + self.target + "_" + photo_id + ".jpg"
801
urllib.request.urlretrieve(url, end)
802
sys.stdout.write("\rDownloaded %i" % counter)
803
sys.stdout.flush()
804
else:
805
carousel = item["carousel_media"]
806
for i in carousel:
807
if limit > -1 and counter >= limit:
808
break
809
counter += 1
810
url = i["image_versions2"]["candidates"][0]["url"]
811
photo_id = i["id"]
812
end = self.output_dir + "/" + self.target + "_" + photo_id + ".jpg"
813
urllib.request.urlretrieve(url, end)
814
sys.stdout.write("\rDownloaded %i" % counter)
815
sys.stdout.flush()
816
817
pc.printout(
818
f"\nWoohoo! We downloaded {counter} medias (saved in {self.output_dir} folder) \n",
819
pc.GREEN,
820
)
821
822
def get_user_propic(self):
823
data = self.user
824
if "hd_profile_pic_url_info" in data:
825
url = data["hd_profile_pic_url_info"]["url"]
826
else:
827
# get better quality photo
828
items = len(data["hd_profile_pic_versions"])
829
url = data["hd_profile_pic_versions"][items - 1]["url"]
830
831
if url != "":
832
end = self.output_dir + "/" + self.target + "_propic.jpg"
833
urllib.request.urlretrieve(url, end)
834
pc.printout("Target propic saved in output folder\n", pc.GREEN)
835
836
else:
837
pc.printout("Sorry! No results found :-(\n", pc.RED)
838
839
def get_user_stories(self):
840
if self.check_private_profile():
841
return
842
843
pc.printout("Searching for target stories...\n")
844
data = self.api.user_stories_v2(self.target_id)
845
counter = 0
846
if data["reel"]: # no stories avaibile
847
items = data["reel"].get("items", [])
848
counter = len(items)
849
for i in items:
850
pc.printout("@", pc.CYAN)
851
story_id = i["id"]
852
if i["media_type"] == 1: # photo
853
url = i["image_versions2"]["candidates"][0]["url"]
854
end = self.output_dir + "/" + self.target + "_" + story_id + ".jpg"
855
urllib.request.urlretrieve(url, end)
856
elif i["media_type"] == 2: # video
857
url = i["video_versions"][0]["url"]
858
end = self.output_dir + "/" + self.target + "_" + story_id + ".mp4"
859
urllib.request.urlretrieve(url, end)
860
861
if counter > 0:
862
pc.printout(
863
f"\n{counter} target stories saved in output folder\n", pc.GREEN
864
)
865
else:
866
pc.printout("\nSorry! No results found :-(\n", pc.RED)
867
868
def get_people_tagged_by_user(self):
869
pc.printout("Searching for users tagged by target...\n")
870
871
pks = []
872
username = []
873
full_name = []
874
tagged = []
875
counter = 1
876
877
data = self.__get_feed__()
878
879
for post in data:
880
usertags = post.get("usertags", [])
881
for tag in usertags:
882
u = tag.get("user", {})
883
if u["pk"] not in pks:
884
pks.append(u["pk"])
885
username.append(u["username"])
886
full_name.append(u["full_name"])
887
tagged.append(1)
888
else:
889
index = pks.index(u["pk"])
890
tagged[index] += 1
891
counter += 1
892
893
if len(pks) > 0:
894
t = PrettyTable()
895
896
t.field_names = ["Posts", "Full Name", "Username", "ID"]
897
t.align["Posts"] = "l"
898
t.align["Full Name"] = "l"
899
t.align["Username"] = "l"
900
t.align["ID"] = "l"
901
902
pc.printout(
903
f"\nWoohoo! We found {len(pks)} ({counter}) users\n",
904
pc.GREEN,
905
)
906
907
json_data = {}
908
tagged_list = []
909
910
for i in range(len(pks)):
911
t.add_row([tagged[i], full_name[i], username[i], str(pks[i])])
912
913
if self.jsonDump:
914
tag = {
915
"post": tagged[i],
916
"full_name": full_name[i],
917
"username": username[i],
918
"id": pks[i],
919
}
920
tagged_list.append(tag)
921
922
if self.writeFile:
923
file_name = self.output_dir + "/" + self.target + "_tagged.txt"
924
file = open(file_name, "w")
925
file.write(str(t))
926
file.close()
927
928
if self.jsonDump:
929
json_data["tagged"] = tagged_list
930
json_file_name = self.output_dir + "/" + self.target + "_tagged.json"
931
with open(json_file_name, "w") as f:
932
json.dump(json_data, f)
933
934
print(t)
935
else:
936
pc.printout("Sorry! No results found :-(\n", pc.RED)
937
938
def get_user(self, username):
939
data = self.api.user_by_username_v2(username)
940
if "error" in data:
941
pc.printout(
942
"Oops... {error}\n".format(**data),
943
pc.RED,
944
)
945
exit(2)
946
elif "detail" in data:
947
pc.printout(
948
f"Oops... {self.target} non exist, please enter a valid username ({data['detail']})\n",
949
pc.RED,
950
)
951
exit(2)
952
user = data["user"]
953
if self.writeFile:
954
file_name = self.output_dir + "/" + self.target + "_user_id.txt"
955
file = open(file_name, "w")
956
file.write(str(user["pk"]))
957
file.close()
958
return user
959
960
def set_write_file(self, flag):
961
if flag:
962
pc.printout("Write to file: ")
963
pc.printout("enabled", pc.GREEN)
964
pc.printout("\n")
965
else:
966
pc.printout("Write to file: ")
967
pc.printout("disabled", pc.RED)
968
pc.printout("\n")
969
970
self.writeFile = flag
971
972
def set_json_dump(self, flag):
973
if flag:
974
pc.printout("Export to JSON: ")
975
pc.printout("enabled", pc.GREEN)
976
pc.printout("\n")
977
else:
978
pc.printout("Export to JSON: ")
979
pc.printout("disabled", pc.RED)
980
pc.printout("\n")
981
982
self.jsonDump = flag
983
984
def to_json(self, python_object):
985
if isinstance(python_object, bytes):
986
return {
987
"__class__": "bytes",
988
"__value__": codecs.encode(python_object, "base64").decode(),
989
}
990
raise TypeError(repr(python_object) + " is not JSON serializable")
991
992
def from_json(self, json_object):
993
if "__class__" in json_object and json_object["__class__"] == "bytes":
994
return codecs.decode(json_object["__value__"].encode(), "base64")
995
return json_object
996
997
def check_private_profile(self):
998
if self.is_private:
999
pc.printout(
1000
"Impossible to execute command: user has private profile\n", pc.RED
1001
)
1002
return True
1003
return False
1004
1005
def get_contact_info(
1006
self, func, from_key, to_key, json_key, file_name, title, field, help_text
1007
):
1008
if self.check_private_profile():
1009
return
1010
1011
items = []
1012
pc.printout(f"{help_text}... this can take a few minutes\n")
1013
1014
next_page_id = ""
1015
while True:
1016
pc.printout("@", pc.CYAN)
1017
result = func(self.target_id, page_id=next_page_id)
1018
items.extend(result.get("response", {}).get("users", []))
1019
next_page_id = result.get("next_page_id")
1020
if not next_page_id:
1021
break
1022
1023
print("\n")
1024
1025
pc.printout(
1026
f"Do you want to get all {title} (for {len(items)} users)? y/n: ", pc.YELLOW
1027
)
1028
value = input()
1029
if value.lower() in ["y", "yes"]:
1030
value = len(items)
1031
elif value == "":
1032
print("\n")
1033
return
1034
elif value.lower() in ["n", "no"]:
1035
while True:
1036
try:
1037
pc.printout(f"How many {title} do you want to get? ", pc.YELLOW)
1038
new_value = int(input())
1039
value = new_value - 1
1040
break
1041
except ValueError:
1042
pc.printout("Error! Please enter a valid integer!", pc.RED)
1043
print("\n")
1044
continue
1045
else:
1046
pc.printout("Error! Please enter y/n :-)", pc.RED)
1047
print("\n")
1048
return
1049
1050
results = []
1051
for follow in items:
1052
pc.printout("@", pc.CYAN)
1053
user = self.api.user_by_id_v2(follow["pk"])
1054
if item := user["user"].get(from_key):
1055
follow[to_key] = item
1056
results.append(follow)
1057
if len(results) > value:
1058
break
1059
1060
if len(results) > 0:
1061
t = PrettyTable(["ID", "Username", "Full Name", field])
1062
t.align["ID"] = "l"
1063
t.align["Username"] = "l"
1064
t.align["Full Name"] = "l"
1065
t.align[field] = "l"
1066
1067
for node in results:
1068
t.add_row(
1069
[str(node["id"]), node["username"], node["full_name"], node[to_key]]
1070
)
1071
1072
if self.writeFile:
1073
file_name = self.output_dir + "/" + self.target + f"_{file_name}.txt"
1074
file = open(file_name, "w")
1075
file.write(str(t))
1076
file.close()
1077
1078
if self.jsonDump:
1079
json_data = {json_key: results}
1080
json_file_name = (
1081
self.output_dir + "/" + self.target + f"_{file_name}.json"
1082
)
1083
with open(json_file_name, "w") as f:
1084
json.dump(json_data, f)
1085
1086
print(t)
1087
else:
1088
pc.printout("\nSorry! No results found :-(\n", pc.RED)
1089
1090
def get_fwersemail(self):
1091
return self.get_contact_info(
1092
self.api.user_followers_v2,
1093
from_key="public_email",
1094
to_key="email",
1095
json_key="followers_email",
1096
file_name="_fwersemail",
1097
title="emails",
1098
field="Email",
1099
help_text="Searching for emails of target followers",
1100
)
1101
1102
def get_fwingsemail(self):
1103
return self.get_contact_info(
1104
self.api.user_following_v2,
1105
from_key="public_email",
1106
to_key="email",
1107
json_key="followings_email",
1108
file_name="_fwingsemail",
1109
title="emails",
1110
field="Email",
1111
help_text="Searching for emails of users followed by target",
1112
)
1113
1114
def get_fwersnumber(self):
1115
return self.get_contact_info(
1116
self.api.user_followers_v2,
1117
from_key="contact_phone_number",
1118
to_key="contact_phone_number",
1119
json_key="followers_phone_numbers",
1120
file_name="_fwersnumber",
1121
title="phone numbers",
1122
field="Phone",
1123
help_text="Searching for phone numbers of users followers",
1124
)
1125
1126
def get_fwingsnumber(self):
1127
return self.get_contact_info(
1128
self.api.user_following_v2,
1129
from_key="contact_phone_number",
1130
to_key="contact_phone_number",
1131
json_key="followings_phone_numbers",
1132
file_name="_fwingsnumber",
1133
title="phone numbers",
1134
field="Phone",
1135
help_text="Searching for phone numbers of users followed by target",
1136
)
1137
1138
def get_comments(self):
1139
if self.check_private_profile():
1140
return
1141
1142
pc.printout("Searching for users who commented...\n")
1143
1144
data = self.__get_feed__()
1145
users = []
1146
1147
for post in data:
1148
comments = self.__get_comments__(post["id"])
1149
for comment in comments:
1150
print(comment["text"])
1151
1152
# if not any(u['id'] == comment['user']['pk'] for u in users):
1153
# user = {
1154
# 'id': comment['user']['pk'],
1155
# 'username': comment['user']['username'],
1156
# 'full_name': comment['user']['full_name'],
1157
# 'counter': 1
1158
# }
1159
# users.append(user)
1160
# else:
1161
# for user in users:
1162
# if user['id'] == comment['user']['pk']:
1163
# user['counter'] += 1
1164
# break
1165
1166
if len(users) > 0:
1167
ssort = sorted(users, key=lambda value: value["counter"], reverse=True)
1168
t = PrettyTable()
1169
1170
t.field_names = ["Comments", "ID", "Username", "Full Name"]
1171
t.align["Comments"] = "l"
1172
t.align["ID"] = "l"
1173
t.align["Username"] = "l"
1174
t.align["Full Name"] = "l"
1175
1176
for u in ssort:
1177
t.add_row([str(u["counter"]), u["id"], u["username"], u["full_name"]])
1178
1179
print(t)
1180
1181
if self.writeFile:
1182
file_name = (
1183
self.output_dir + "/" + self.target + "_users_who_commented.txt"
1184
)
1185
file = open(file_name, "w")
1186
file.write(str(t))
1187
file.close()
1188
1189
if self.jsonDump:
1190
json_data = {"users_who_commented": ssort}
1191
json_file_name = (
1192
self.output_dir + "/" + self.target + "_users_who_commented.json"
1193
)
1194
with open(json_file_name, "w") as f:
1195
json.dump(json_data, f)
1196
else:
1197
pc.printout("Sorry! No results found :-(\n", pc.RED)
1198
1199
def clear_cache(self):
1200
pc.printout("Cache is already empty.\n", pc.GREEN)
1201
1202