Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
StevenBlack
GitHub Repository: StevenBlack/hosts
Path: blob/master/updateHostsFile.py
1181 views
1
#!/usr/bin/env python3
2
3
# Script by Ben Limmer
4
# https://github.com/l1m5
5
#
6
# This Python script will combine all the host files you provide
7
# as sources into one, unique host file to keep your internet browsing happy.
8
9
import argparse
10
import fnmatch
11
import ipaddress
12
import json
13
import locale
14
import os
15
import platform
16
from pathlib import Path
17
import re
18
import shutil
19
import socket
20
import subprocess
21
import sys
22
import tempfile
23
import time
24
from glob import glob
25
from typing import Optional, Tuple
26
27
# Detecting Python 3 for version-dependent implementations
28
PY3 = sys.version_info >= (3, 0)
29
30
if not PY3:
31
raise Exception("We do not support Python 2 anymore.")
32
33
34
try:
35
import requests
36
except ImportError:
37
raise ImportError(
38
"This project's dependencies have changed. The Requests library ("
39
"https://docs.python-requests.org/en/latest/) is now required."
40
)
41
42
43
# Syntactic sugar for "sudo" command in UNIX / Linux
44
if platform.system() == "OpenBSD":
45
SUDO = ["/usr/bin/doas"]
46
elif platform.system() == "Windows":
47
SUDO = ["powershell", "Start-Process", "powershell", "-Verb", "runAs"]
48
else:
49
SUDO = ["/usr/bin/env", "sudo"]
50
51
52
# Project Settings
53
BASEDIR_PATH = os.path.dirname(os.path.realpath(__file__))
54
55
56
def get_defaults():
57
"""
58
Helper method for getting the default settings.
59
60
Returns
61
-------
62
default_settings : dict
63
A dictionary of the default settings when updating host information.
64
"""
65
66
return {
67
"numberofrules": 0,
68
"datapath": path_join_robust(BASEDIR_PATH, "data"),
69
"freshen": True,
70
"replace": False,
71
"backup": False,
72
"skipstatichosts": False,
73
"keepdomaincomments": True,
74
"extensionspath": path_join_robust(BASEDIR_PATH, "extensions"),
75
"extensions": [],
76
"nounifiedhosts": False,
77
"compress": False,
78
"minimise": False,
79
"outputsubfolder": "",
80
"hostfilename": "hosts",
81
"targetip": "0.0.0.0",
82
"sourcedatafilename": "update.json",
83
"sourcesdata": [],
84
"readmefilename": "readme.md",
85
"readmetemplate": path_join_robust(BASEDIR_PATH, "readme_template.md"),
86
"readmedata": {},
87
"readmedatafilename": path_join_robust(BASEDIR_PATH, "readmeData.json"),
88
"exclusionpattern": r"([a-zA-Z\d-]+\.){0,}",
89
"exclusionregexes": [],
90
"exclusions": [],
91
"commonexclusions": ["hulu.com"],
92
"blacklistfile": path_join_robust(BASEDIR_PATH, "blacklist"),
93
"whitelistfile": path_join_robust(BASEDIR_PATH, "whitelist"),
94
}
95
96
97
# End Project Settings
98
99
100
def main():
101
parser = argparse.ArgumentParser(
102
description="Creates a unified hosts "
103
"file from hosts stored in the data subfolders."
104
)
105
parser.add_argument(
106
"--auto",
107
"-a",
108
dest="auto",
109
default=False,
110
action="store_true",
111
help="Run without prompting.",
112
)
113
parser.add_argument(
114
"--backup",
115
"-b",
116
dest="backup",
117
default=False,
118
action="store_true",
119
help="Backup the hosts files before they are overridden.",
120
)
121
parser.add_argument(
122
"--extensions",
123
"-e",
124
dest="extensions",
125
default=[],
126
nargs="*",
127
help="Host extensions to include in the final hosts file.",
128
)
129
parser.add_argument(
130
"--nounifiedhosts",
131
dest="nounifiedhosts",
132
default=False,
133
action="store_true",
134
help="Do not include the unified hosts file in the final hosts file. Usually used together with `--extensions`.",
135
)
136
parser.add_argument(
137
"--ip",
138
"-i",
139
dest="targetip",
140
default="0.0.0.0",
141
help="Target IP address. Default is 0.0.0.0.",
142
)
143
parser.add_argument(
144
"--keepdomaincomments",
145
"-k",
146
dest="keepdomaincomments",
147
action="store_false",
148
default=True,
149
help="Do not keep domain line comments.",
150
)
151
parser.add_argument(
152
"--noupdate",
153
"-n",
154
dest="noupdate",
155
default=False,
156
action="store_true",
157
help="Don't update from host data sources.",
158
)
159
parser.add_argument(
160
"--skipstatichosts",
161
"-s",
162
dest="skipstatichosts",
163
default=False,
164
action="store_true",
165
help="Skip static localhost entries in the final hosts file.",
166
)
167
parser.add_argument(
168
"--nogendata",
169
"-g",
170
dest="nogendata",
171
default=False,
172
action="store_true",
173
help="Skip generation of readmeData.json",
174
)
175
parser.add_argument(
176
"--output",
177
"-o",
178
dest="outputsubfolder",
179
default="",
180
help="Output subfolder for generated hosts file.",
181
)
182
parser.add_argument(
183
"--replace",
184
"-r",
185
dest="replace",
186
default=False,
187
action="store_true",
188
help="Replace your active hosts file with this new hosts file.",
189
)
190
parser.add_argument(
191
"--flush-dns-cache",
192
"-f",
193
dest="flushdnscache",
194
default=False,
195
action="store_true",
196
help="Attempt to flush DNS cache after replacing the hosts file.",
197
)
198
parser.add_argument(
199
"--compress",
200
"-c",
201
dest="compress",
202
default=False,
203
action="store_true",
204
help="Compress the hosts file ignoring non-necessary lines "
205
"(empty lines and comments) and putting multiple domains in "
206
"each line. Improve the performance under Windows.",
207
)
208
parser.add_argument(
209
"--minimise",
210
"-m",
211
dest="minimise",
212
default=False,
213
action="store_true",
214
help="Minimise the hosts file ignoring non-necessary lines "
215
"(empty lines and comments).",
216
)
217
parser.add_argument(
218
"--whitelist",
219
"-w",
220
dest="whitelistfile",
221
default=path_join_robust(BASEDIR_PATH, "whitelist"),
222
help="Whitelist file to use while generating hosts files.",
223
)
224
parser.add_argument(
225
"--blacklist",
226
"-x",
227
dest="blacklistfile",
228
default=path_join_robust(BASEDIR_PATH, "blacklist"),
229
help="Blacklist file to use while generating hosts files.",
230
)
231
232
global settings
233
234
options = vars(parser.parse_args())
235
236
options["outputpath"] = path_join_robust(BASEDIR_PATH, options["outputsubfolder"])
237
options["freshen"] = not options["noupdate"]
238
239
settings = get_defaults()
240
settings.update(options)
241
242
datapath = settings["datapath"]
243
extensionspath = settings["extensionspath"]
244
245
settings["sources"] = list_dir_no_hidden(datapath)
246
settings["extensionsources"] = list_dir_no_hidden(extensionspath)
247
248
# All our extensions folders...
249
settings["extensions"] = [
250
os.path.basename(item) for item in list_dir_no_hidden(extensionspath)
251
]
252
# ... intersected with the extensions passed-in as arguments, then sorted.
253
settings["extensions"] = sorted(
254
list(set(options["extensions"]).intersection(settings["extensions"]))
255
)
256
257
auto = settings["auto"]
258
exclusionregexes = settings["exclusionregexes"]
259
sourcedatafilename = settings["sourcedatafilename"]
260
nounifiedhosts = settings["nounifiedhosts"]
261
262
updatesources = prompt_for_update(freshen=settings["freshen"], updateauto=auto)
263
if updatesources:
264
update_all_sources(sourcedatafilename, settings["hostfilename"])
265
266
gatherexclusions = prompt_for_exclusions(skipprompt=auto)
267
268
if gatherexclusions:
269
commonexclusions = settings["commonexclusions"]
270
exclusionpattern = settings["exclusionpattern"]
271
exclusionregexes = display_exclusion_options(
272
commonexclusions=commonexclusions,
273
exclusionpattern=exclusionpattern,
274
exclusionregexes=exclusionregexes,
275
)
276
277
extensions = settings["extensions"]
278
sourcesdata = update_sources_data(
279
settings["sourcesdata"],
280
datapath=datapath,
281
extensions=extensions,
282
extensionspath=extensionspath,
283
sourcedatafilename=sourcedatafilename,
284
nounifiedhosts=nounifiedhosts,
285
)
286
287
mergefile = create_initial_file(
288
nounifiedhosts=nounifiedhosts,
289
)
290
remove_old_hosts_file(settings["outputpath"], "hosts", settings["backup"])
291
if settings["compress"]:
292
finalfile = open(path_join_robust(settings["outputpath"], "hosts"), "w+b")
293
compressedfile = tempfile.NamedTemporaryFile()
294
remove_dups_and_excl(mergefile, exclusionregexes, compressedfile)
295
compress_file(compressedfile, settings["targetip"], finalfile)
296
elif settings["minimise"]:
297
finalfile = open(path_join_robust(settings["outputpath"], "hosts"), "w+b")
298
minimisedfile = tempfile.NamedTemporaryFile()
299
remove_dups_and_excl(mergefile, exclusionregexes, minimisedfile)
300
minimise_file(minimisedfile, settings["targetip"], finalfile)
301
else:
302
finalfile = remove_dups_and_excl(mergefile, exclusionregexes)
303
304
numberofrules = settings["numberofrules"]
305
outputsubfolder = settings["outputsubfolder"]
306
skipstatichosts = settings["skipstatichosts"]
307
308
write_opening_header(
309
finalfile,
310
extensions=extensions,
311
numberofrules=numberofrules,
312
outputsubfolder=outputsubfolder,
313
skipstatichosts=skipstatichosts,
314
nounifiedhosts=nounifiedhosts,
315
)
316
finalfile.close()
317
318
if not settings["nogendata"]:
319
update_readme_data(
320
settings["readmedatafilename"],
321
extensions=extensions,
322
numberofrules=numberofrules,
323
outputsubfolder=outputsubfolder,
324
sourcesdata=sourcesdata,
325
nounifiedhosts=nounifiedhosts,
326
)
327
328
print_success(
329
"Success! The hosts file has been saved in folder "
330
+ "./"
331
+ outputsubfolder
332
+ "\nIt contains "
333
+ "{:,}".format(numberofrules)
334
+ " unique entries."
335
)
336
337
movefile = prompt_for_move(
338
finalfile,
339
auto=auto,
340
replace=settings["replace"],
341
skipstatichosts=skipstatichosts,
342
)
343
344
# We only flush the DNS cache if we have
345
# moved a new hosts file into place.
346
if movefile:
347
prompt_for_flush_dns_cache(
348
flushcache=settings["flushdnscache"], promptflush=not auto
349
)
350
351
352
# Prompt the User
353
def prompt_for_update(freshen, updateauto):
354
"""
355
Prompt the user to update all hosts files.
356
357
If requested, the function will update all data sources after it
358
checks that a hosts file does indeed exist.
359
360
Parameters
361
----------
362
freshen : bool
363
Whether data sources should be updated. This function will return
364
if it is requested that data sources not be updated.
365
updateauto : bool
366
Whether or not to automatically update all data sources.
367
368
Returns
369
-------
370
updatesources : bool
371
Whether or not we should update data sources for exclusion files.
372
"""
373
374
# Create a hosts file if it doesn't exist.
375
hostsfile = path_join_robust(BASEDIR_PATH, "hosts")
376
377
if not os.path.isfile(hostsfile):
378
try:
379
open(hostsfile, "w+").close()
380
except (IOError, OSError):
381
# Starting in Python 3.3, IOError is aliased
382
# OSError. However, we have to catch both for
383
# Python 2.x failures.
384
print_failure(
385
"ERROR: No 'hosts' file in the folder. Try creating one manually."
386
)
387
388
if not freshen:
389
return False
390
391
prompt = "Do you want to update all data sources?"
392
393
if updateauto or query_yes_no(prompt):
394
return True
395
elif not updateauto:
396
print("OK, we'll stick with what we've got locally.")
397
398
return False
399
400
401
def prompt_for_exclusions(skipprompt):
402
"""
403
Prompt the user to exclude any custom domains from being blocked.
404
405
Parameters
406
----------
407
skipprompt : bool
408
Whether or not to skip prompting for custom domains to be excluded.
409
If true, the function returns immediately.
410
411
Returns
412
-------
413
gatherexclusions : bool
414
Whether or not we should proceed to prompt the user to exclude any
415
custom domains beyond those in the whitelist.
416
"""
417
418
prompt = (
419
"Do you want to exclude any domains?\n"
420
"For example, hulu.com video streaming must be able to access "
421
"its tracking and ad servers in order to play video."
422
)
423
424
if not skipprompt:
425
if query_yes_no(prompt):
426
return True
427
else:
428
print("OK, we'll only exclude domains in the whitelist.")
429
430
return False
431
432
433
def prompt_for_flush_dns_cache(flushcache, promptflush):
434
"""
435
Prompt the user to flush the DNS cache.
436
437
Parameters
438
----------
439
flushcache : bool
440
Whether to flush the DNS cache without prompting.
441
promptflush : bool
442
If `flushcache` is False, whether we should prompt for flushing the
443
cache. Otherwise, the function returns immediately.
444
"""
445
446
if flushcache:
447
flush_dns_cache()
448
elif promptflush:
449
if query_yes_no("Attempt to flush the DNS cache?"):
450
flush_dns_cache()
451
452
453
def prompt_for_move(finalfile, **moveparams):
454
"""
455
Prompt the user to move the newly created hosts file to its designated
456
location in the OS.
457
458
Parameters
459
----------
460
finalfile : file
461
The file object that contains the newly created hosts data.
462
moveparams : kwargs
463
Dictionary providing additional parameters for moving the hosts file
464
into place. Currently, those fields are:
465
466
1) auto
467
2) replace
468
3) skipstatichosts
469
470
Returns
471
-------
472
movefile : bool
473
Whether or not the final hosts file was moved.
474
"""
475
476
skipstatichosts = moveparams["skipstatichosts"]
477
478
if moveparams["replace"] and not skipstatichosts:
479
movefile = True
480
elif moveparams["auto"] or skipstatichosts:
481
movefile = False
482
else:
483
prompt = "Do you want to replace your existing hosts file with the newly generated file?"
484
movefile = query_yes_no(prompt)
485
486
if movefile:
487
movefile = move_hosts_file_into_place(finalfile)
488
489
return movefile
490
491
492
# End Prompt the User
493
494
495
def sort_sources(sources):
496
"""
497
Sorts the sources.
498
The idea is that all Steven Black's list, file or entries
499
get on top and the rest sorted alphabetically.
500
501
Parameters
502
----------
503
sources: list
504
The sources to sort.
505
"""
506
507
result = sorted(
508
sources.copy(),
509
key=lambda x: x.lower().replace("-", "").replace("_", "").replace(" ", ""),
510
)
511
512
# Steven Black's repositories/files/lists should be on top!
513
stevenblackpositions = [
514
x for x, y in enumerate(result) if "stevenblack" in y.lower()
515
]
516
517
for index in stevenblackpositions:
518
result.insert(0, result.pop(index))
519
520
return result
521
522
523
# Exclusion logic
524
def display_exclusion_options(commonexclusions, exclusionpattern, exclusionregexes):
525
"""
526
Display the exclusion options to the user.
527
528
This function checks whether a user wants to exclude particular domains,
529
and if so, excludes them.
530
531
Parameters
532
----------
533
commonexclusions : list
534
A list of common domains that are excluded from being blocked. One
535
example is Hulu. This setting is set directly in the script and cannot
536
be overwritten by the user.
537
exclusionpattern : str
538
The exclusion pattern with which to create the domain regex.
539
exclusionregexes : list
540
The list of regex patterns used to exclude domains.
541
542
Returns
543
-------
544
aug_exclusionregexes : list
545
The original list of regex patterns potentially with additional
546
patterns from domains that the user chooses to exclude.
547
"""
548
549
for exclusionoption in commonexclusions:
550
prompt = "Do you want to exclude the domain " + exclusionoption + " ?"
551
552
if query_yes_no(prompt):
553
exclusionregexes = exclude_domain(
554
exclusionoption, exclusionpattern, exclusionregexes
555
)
556
else:
557
continue
558
559
if query_yes_no("Do you want to exclude any other domains?"):
560
exclusionregexes = gather_custom_exclusions(
561
exclusionpattern, exclusionregexes
562
)
563
564
return exclusionregexes
565
566
567
def gather_custom_exclusions(exclusionpattern, exclusionregexes):
568
"""
569
Gather custom exclusions from the user.
570
571
Parameters
572
----------
573
exclusionpattern : str
574
The exclusion pattern with which to create the domain regex.
575
exclusionregexes : list
576
The list of regex patterns used to exclude domains.
577
578
Returns
579
-------
580
aug_exclusionregexes : list
581
The original list of regex patterns potentially with additional
582
patterns from domains that the user chooses to exclude.
583
"""
584
585
# We continue running this while-loop until the user
586
# says that they have no more domains to exclude.
587
while True:
588
domainprompt = "Enter the domain you want to exclude (e.g. facebook.com): "
589
userdomain = input(domainprompt)
590
591
if is_valid_user_provided_domain_format(userdomain):
592
exclusionregexes = exclude_domain(
593
userdomain, exclusionpattern, exclusionregexes
594
)
595
596
continueprompt = "Do you have more domains you want to enter?"
597
if not query_yes_no(continueprompt):
598
break
599
600
return exclusionregexes
601
602
603
def exclude_domain(domain, exclusionpattern, exclusionregexes):
604
"""
605
Exclude a domain from being blocked.
606
607
This creates the domain regex by which to exclude this domain and appends
608
it a list of already-existing exclusion regexes.
609
610
Parameters
611
----------
612
domain : str
613
The filename or regex pattern to exclude.
614
exclusionpattern : str
615
The exclusion pattern with which to create the domain regex.
616
exclusionregexes : list
617
The list of regex patterns used to exclude domains.
618
619
Returns
620
-------
621
aug_exclusionregexes : list
622
The original list of regex patterns with one additional pattern from
623
the `domain` input.
624
"""
625
626
exclusionregex = re.compile(exclusionpattern + domain)
627
exclusionregexes.append(exclusionregex)
628
629
return exclusionregexes
630
631
632
def matches_exclusions(strippedrule, exclusionregexes):
633
"""
634
Check whether a rule matches an exclusion rule we already provided.
635
636
If this function returns True, that means this rule should be excluded
637
from the final hosts file.
638
639
Parameters
640
----------
641
strippedrule : str
642
The rule that we are checking.
643
exclusionregexes : list
644
The list of regex patterns used to exclude domains.
645
646
Returns
647
-------
648
matches_exclusion : bool
649
Whether or not the rule string matches a provided exclusion.
650
"""
651
652
try:
653
strippeddpmain = strippedrule.split()[1]
654
except IndexError:
655
# Example: 'example.org' instead of '0.0.0.0 example.org'
656
strippeddpmain = strippedrule
657
658
for exclusionRegex in exclusionregexes:
659
if exclusionRegex.search(strippeddpmain):
660
return True
661
662
return False
663
664
665
# End Exclusion Logic
666
667
668
# Update Logic
669
def update_sources_data(sourcesdata, **sourcesparams):
670
"""
671
Update the sources data and information for each source.
672
673
Parameters
674
----------
675
sourcesdata : list
676
The list of sources data that we are to update.
677
sourcesparams : kwargs
678
Dictionary providing additional parameters for updating the
679
sources data. Currently, those fields are:
680
681
1) datapath
682
2) extensions
683
3) extensionspath
684
4) sourcedatafilename
685
5) nounifiedhosts
686
687
Returns
688
-------
689
update_sources_data : list
690
The original source data list with new source data appended.
691
"""
692
693
sourcedatafilename = sourcesparams["sourcedatafilename"]
694
695
if not sourcesparams["nounifiedhosts"]:
696
for source in sort_sources(
697
recursive_glob(sourcesparams["datapath"], sourcedatafilename)
698
):
699
updatefile = open(source, "r", encoding="UTF-8")
700
try:
701
updatedata = json.load(updatefile)
702
sourcesdata.append(updatedata)
703
finally:
704
updatefile.close()
705
706
for source in sourcesparams["extensions"]:
707
sourcedir = path_join_robust(sourcesparams["extensionspath"], source)
708
for updatefile_path in sort_sources(
709
recursive_glob(sourcedir, sourcedatafilename)
710
):
711
updatefile = open(updatefile_path, "r")
712
try:
713
updatedata = json.load(updatefile)
714
sourcesdata.append(updatedata)
715
finally:
716
updatefile.close()
717
718
return sourcesdata
719
720
721
def jsonarray(json_array_string):
722
"""
723
Transformer, converts a json array string hosts into one host per
724
line, prefixing each line with "127.0.0.1 ".
725
726
Parameters
727
----------
728
json_array_string : str
729
The json array string in the form
730
'["example1.com", "example1.com", ...]'
731
"""
732
733
templist = json.loads(json_array_string)
734
hostlines = "127.0.0.1 " + "\n127.0.0.1 ".join(templist)
735
return hostlines
736
737
738
def update_all_sources(sourcedatafilename, hostfilename):
739
"""
740
Update all host files, regardless of folder depth.
741
742
Parameters
743
----------
744
sourcedatafilename : str
745
The name of the filename where information regarding updating
746
sources for a particular URL is stored. This filename is assumed
747
to be the same for all sources.
748
hostfilename : str
749
The name of the file in which the updated source information
750
is stored for a particular URL. This filename is assumed to be
751
the same for all sources.
752
"""
753
754
# The transforms we support
755
transformmethods = {"jsonarray": jsonarray}
756
757
allsources = sort_sources(recursive_glob("*", sourcedatafilename))
758
759
for source in allsources:
760
updatefile = open(source, "r", encoding="UTF-8")
761
updatedata = json.load(updatefile)
762
updatefile.close()
763
764
# we can pause updating any given hosts source.
765
# if the update.json "pause" key is missing, don't pause.
766
if updatedata.get("pause", False):
767
continue
768
769
updateurl = updatedata["url"]
770
update_transforms = []
771
if updatedata.get("transforms"):
772
update_transforms = updatedata["transforms"]
773
774
print("Updating source " + os.path.dirname(source) + " from " + updateurl)
775
776
try:
777
updatedfile = get_file_by_url(updateurl)
778
779
# spin the transforms as required
780
for transform in update_transforms:
781
updatedfile = transformmethods[transform](updatedfile)
782
783
# get rid of carriage-return symbols
784
updatedfile = updatedfile.replace("\r", "")
785
786
hostsfile = open(
787
path_join_robust(BASEDIR_PATH, os.path.dirname(source), hostfilename),
788
"wb",
789
)
790
write_data(hostsfile, updatedfile)
791
hostsfile.close()
792
except Exception:
793
print("Error in updating source: ", updateurl)
794
795
796
# End Update Logic
797
798
799
# File Logic
800
def create_initial_file(**initial_file_params):
801
"""
802
Initialize the file in which we merge all host files for later pruning.
803
804
Parameters
805
----------
806
headerparams : kwargs
807
Dictionary providing additional parameters for populating the initial file
808
information. Currently, those fields are:
809
810
1) nounifiedhosts
811
"""
812
813
mergefile = tempfile.NamedTemporaryFile()
814
815
if not initial_file_params["nounifiedhosts"]:
816
# spin the sources for the base file
817
for source in sort_sources(
818
recursive_glob(settings["datapath"], settings["hostfilename"])
819
):
820
start = "# Start {}\n\n".format(os.path.basename(os.path.dirname(source)))
821
end = "\n# End {}\n\n".format(os.path.basename(os.path.dirname(source)))
822
823
with open(source, "r", encoding="UTF-8") as curFile:
824
write_data(mergefile, start + curFile.read() + end)
825
826
# spin the sources for extensions to the base file
827
for source in settings["extensions"]:
828
for filename in sort_sources(
829
recursive_glob(
830
path_join_robust(settings["extensionspath"], source),
831
settings["hostfilename"],
832
)
833
):
834
with open(filename, "r") as curFile:
835
write_data(mergefile, curFile.read())
836
837
maybe_copy_example_file(settings["blacklistfile"])
838
839
if os.path.isfile(settings["blacklistfile"]):
840
with open(settings["blacklistfile"], "r") as curFile:
841
write_data(mergefile, curFile.read())
842
843
return mergefile
844
845
846
def compress_file(inputfile, targetip, outputfile):
847
"""
848
Reduce the file dimension removing non-necessary lines (empty lines and
849
comments) and putting multiple domains in each line.
850
Reducing the number of lines of the file, the parsing under Microsoft
851
Windows is much faster.
852
853
Parameters
854
----------
855
inputfile : file
856
The file object that contains the hostnames that we are reducing.
857
targetip : str
858
The target IP address.
859
outputfile : file
860
The file object that will contain the reduced hostnames.
861
"""
862
863
inputfile.seek(0) # reset file pointer
864
write_data(outputfile, "\n")
865
866
targetip_len = len(targetip)
867
lines = [targetip]
868
lines_index = 0
869
for line in inputfile.readlines():
870
line = line.decode("UTF-8")
871
872
if line.startswith(targetip):
873
if lines[lines_index].count(" ") < 9:
874
lines[lines_index] += (
875
" " + line[targetip_len : line.find("#")].strip() # noqa: E203
876
)
877
else:
878
lines[lines_index] += "\n"
879
lines.append(line[: line.find("#")].strip())
880
lines_index += 1
881
882
for line in lines:
883
write_data(outputfile, line)
884
885
inputfile.close()
886
887
888
def minimise_file(inputfile, targetip, outputfile):
889
"""
890
Reduce the file dimension removing non-necessary lines (empty lines and
891
comments).
892
893
Parameters
894
----------
895
inputfile : file
896
The file object that contains the hostnames that we are reducing.
897
targetip : str
898
The target IP address.
899
outputfile : file
900
The file object that will contain the reduced hostnames.
901
"""
902
903
inputfile.seek(0) # reset file pointer
904
write_data(outputfile, "\n")
905
906
lines = []
907
for line in inputfile.readlines():
908
line = line.decode("UTF-8")
909
910
if line.startswith(targetip):
911
lines.append(line[: line.find("#")].strip() + "\n")
912
913
for line in lines:
914
write_data(outputfile, line)
915
916
inputfile.close()
917
918
919
def remove_dups_and_excl(mergefile, exclusionregexes, outputfile=None):
920
"""
921
Remove duplicates and remove hosts that we are excluding.
922
923
We check for duplicate hostnames as well as remove any hostnames that
924
have been explicitly excluded by the user.
925
926
Parameters
927
----------
928
mergefile : file
929
The file object that contains the hostnames that we are pruning.
930
exclusionregexes : list
931
The list of regex patterns used to exclude domains.
932
outputfile : file
933
The file object in which the result is written. If None, the file
934
'settings["outputpath"]' will be created.
935
"""
936
937
numberofrules = settings["numberofrules"]
938
maybe_copy_example_file(settings["whitelistfile"])
939
940
if os.path.isfile(settings["whitelistfile"]):
941
with open(settings["whitelistfile"], "r") as ins:
942
for line in ins:
943
line = line.strip(" \t\n\r")
944
if line and not line.startswith("#"):
945
settings["exclusions"].append(line)
946
947
if not os.path.exists(settings["outputpath"]):
948
os.makedirs(settings["outputpath"])
949
950
if outputfile is None:
951
finalfile = open(path_join_robust(settings["outputpath"], "hosts"), "w+b")
952
else:
953
finalfile = outputfile
954
955
# analyze any post.json here
956
post_json_path = os.path.join(os.path.dirname(finalfile.name), "post.json")
957
filters = []
958
if os.path.isfile(post_json_path):
959
try:
960
with open(post_json_path, "r", encoding="UTF-8") as post_file:
961
post_data = json.load(post_file)
962
filters = post_data.get("filters", [])
963
except Exception as e:
964
print_failure(f"Error reading post.json: {e}")
965
966
mergefile.seek(0) # reset file pointer
967
hostnames = {"localhost", "localhost.localdomain", "local", "broadcasthost"}
968
exclusions = settings["exclusions"]
969
970
for line in mergefile.readlines():
971
write_line = True
972
973
# Explicit encoding
974
line = line.decode("UTF-8")
975
976
# Apply post.json filters
977
if filters and any(f in line for f in filters):
978
continue
979
980
# replace tabs with space
981
line = line.replace("\t+", " ")
982
983
# see gh-271: trim trailing whitespace, periods
984
line = line.rstrip(" .")
985
986
# Testing the first character doesn't require startswith
987
if line[0] == "#" or re.match(r"^\s*$", line[0]):
988
write_data(finalfile, line)
989
continue
990
if "::1" in line:
991
continue
992
993
strippedrule = strip_rule(line) # strip comments
994
if not strippedrule or matches_exclusions(strippedrule, exclusionregexes):
995
continue
996
997
# Issue #1628
998
if "@" in strippedrule:
999
continue
1000
1001
# Normalize rule
1002
hostname, normalized_rule = normalize_rule(
1003
strippedrule,
1004
targetip=settings["targetip"],
1005
keep_domain_comments=settings["keepdomaincomments"],
1006
)
1007
1008
for exclude in exclusions:
1009
if re.search(r"(^|[\s\.])" + re.escape(exclude) + r"\s", line):
1010
write_line = False
1011
break
1012
1013
if normalized_rule and (hostname not in hostnames) and write_line:
1014
write_data(finalfile, normalized_rule)
1015
hostnames.add(hostname)
1016
numberofrules += 1
1017
1018
settings["numberofrules"] = numberofrules
1019
mergefile.close()
1020
1021
if outputfile is None:
1022
return finalfile
1023
1024
1025
def normalize_rule(rule, targetip, keep_domain_comments):
1026
"""
1027
Standardize and format the rule string provided.
1028
1029
Parameters
1030
----------
1031
rule : str
1032
The rule whose spelling and spacing we are standardizing.
1033
targetip : str
1034
The target IP address for the rule.
1035
keep_domain_comments : bool
1036
Whether or not to keep comments regarding these domains in
1037
the normalized rule.
1038
1039
Returns
1040
-------
1041
normalized_rule : tuple
1042
A tuple of the hostname and the rule string with spelling
1043
and spacing reformatted.
1044
"""
1045
1046
def normalize_response(
1047
extracted_hostname: str, extracted_suffix: Optional[str]
1048
) -> Tuple[str, str]:
1049
"""
1050
Normalizes the responses after the provision of the extracted
1051
hostname and suffix - if exist.
1052
1053
Parameters
1054
----------
1055
extracted_hostname: str
1056
The extracted hostname to work with.
1057
extracted_suffix: str
1058
The extracted suffix to with.
1059
1060
Returns
1061
-------
1062
normalized_response: tuple
1063
A tuple of the hostname and the rule string with spelling
1064
and spacing reformatted.
1065
"""
1066
1067
rule = "%s %s" % (targetip, extracted_hostname)
1068
1069
if keep_domain_comments and extracted_suffix:
1070
if not extracted_suffix.strip().startswith("#"):
1071
# Strings are stripped, therefore we need to add the space back.
1072
rule += " # %s" % extracted_suffix
1073
else:
1074
rule += " %s" % extracted_suffix
1075
1076
return extracted_hostname, rule + "\n"
1077
1078
def is_ip(dataset: str) -> bool:
1079
"""
1080
Checks whether the given dataset is an IP.
1081
1082
Parameters
1083
----------
1084
1085
dataset: str
1086
The dataset to work with.
1087
1088
Returns
1089
-------
1090
is_ip: bool
1091
Whether the dataset is an IP.
1092
"""
1093
1094
try:
1095
_ = ipaddress.ip_address(dataset)
1096
return True
1097
except ValueError:
1098
return False
1099
1100
def belch_unwanted(unwanted: str) -> Tuple[None, None]:
1101
"""
1102
Belches unwanted to screen.
1103
1104
Parameters
1105
----------
1106
unwanted: str
1107
The unwanted string to belch.
1108
1109
Returns
1110
-------
1111
belched: tuple
1112
A tuple of None, None.
1113
"""
1114
1115
"""
1116
finally, if we get here, just belch to screen
1117
"""
1118
print("==>%s<==" % unwanted)
1119
return None, None
1120
1121
"""
1122
first try: IP followed by domain
1123
"""
1124
1125
static_ip_regex = r"^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})$"
1126
split_rule = rule.split(maxsplit=1)
1127
1128
if is_ip(split_rule[0]):
1129
# Assume that the first item is an IP address following the rule.
1130
1131
if " " or "\t" in split_rule[-1]:
1132
try:
1133
# Example: 0.0.0.0 example.org # hello, world!
1134
hostname, suffix = split_rule[-1].split(maxsplit=1)
1135
except ValueError:
1136
# Example: 0.0.0.0 example.org[:space:]
1137
hostname, suffix = split_rule[-1], None
1138
else:
1139
# Example: 0.0.0.0 example.org
1140
hostname, suffix = split_rule[-1], None
1141
1142
hostname = hostname.lower()
1143
1144
if (
1145
is_ip(hostname)
1146
or re.search(static_ip_regex, hostname)
1147
or "." not in hostname
1148
or ".." in hostname
1149
or "." in hostname[-1]
1150
or "/" in hostname
1151
or ":" in hostname
1152
):
1153
# Example: 0.0.0.0 127.0.0.1
1154
1155
# If the hostname is:
1156
# - an IP - or looks like it,
1157
# - doesn't contain dots, or
1158
# - contains repeated dots,
1159
# - ends in a dot, or
1160
# - contains a slash, or
1161
# - contains a colon,
1162
# - contains an underscore,
1163
# we don't want to normalize it.
1164
return belch_unwanted(rule)
1165
1166
return normalize_response(hostname, suffix)
1167
1168
if (
1169
not re.search(static_ip_regex, split_rule[0])
1170
and ":" not in split_rule[0]
1171
and ".." not in split_rule[0]
1172
and "/" not in split_rule[0]
1173
and "." in split_rule[0]
1174
):
1175
# Deny anything that looks like an IP; doesn't container dots or INVALID.
1176
1177
try:
1178
hostname, suffix = split_rule
1179
except ValueError:
1180
hostname, suffix = split_rule[0], None
1181
1182
hostname = hostname.lower()
1183
1184
return normalize_response(hostname, suffix)
1185
1186
return belch_unwanted(rule)
1187
1188
1189
def strip_rule(line):
1190
"""
1191
Sanitize a rule string provided before writing it to the output hosts file.
1192
1193
Parameters
1194
----------
1195
line : str
1196
The rule provided for sanitation.
1197
1198
Returns
1199
-------
1200
sanitized_line : str
1201
The sanitized rule.
1202
"""
1203
1204
return " ".join(line.split())
1205
1206
1207
def write_opening_header(finalfile, **headerparams):
1208
"""
1209
Write the header information into the newly-created hosts file.
1210
1211
Parameters
1212
----------
1213
finalfile : file
1214
The file object that points to the newly-created hosts file.
1215
headerparams : kwargs
1216
Dictionary providing additional parameters for populating the header
1217
information. Currently, those fields are:
1218
1219
1) extensions
1220
2) numberofrules
1221
3) outputsubfolder
1222
4) skipstatichosts
1223
5) nounifiedhosts
1224
"""
1225
1226
finalfile.seek(0) # Reset file pointer.
1227
file_contents = finalfile.read() # Save content.
1228
1229
finalfile.seek(0) # Write at the top.
1230
1231
nounifiedhosts = headerparams["nounifiedhosts"]
1232
1233
if headerparams["extensions"]:
1234
if nounifiedhosts:
1235
if len(headerparams["extensions"]) > 1:
1236
write_data(
1237
finalfile,
1238
"# Title: StevenBlack/hosts extensions {0} and {1} \n#\n".format(
1239
", ".join(headerparams["extensions"][:-1]),
1240
headerparams["extensions"][-1],
1241
),
1242
)
1243
else:
1244
write_data(
1245
finalfile,
1246
"# Title: StevenBlack/hosts extension {0}\n#\n".format(
1247
", ".join(headerparams["extensions"])
1248
),
1249
)
1250
else:
1251
if len(headerparams["extensions"]) > 1:
1252
write_data(
1253
finalfile,
1254
"# Title: StevenBlack/hosts with the {0} and {1} extensions\n#\n".format(
1255
", ".join(headerparams["extensions"][:-1]),
1256
headerparams["extensions"][-1],
1257
),
1258
)
1259
else:
1260
write_data(
1261
finalfile,
1262
"# Title: StevenBlack/hosts with the {0} extension\n#\n".format(
1263
", ".join(headerparams["extensions"])
1264
),
1265
)
1266
else:
1267
write_data(finalfile, "# Title: StevenBlack/hosts\n#\n")
1268
1269
write_data(
1270
finalfile,
1271
"# This hosts file is a merged collection "
1272
"of hosts from reputable sources,\n",
1273
)
1274
write_data(finalfile, "# with a dash of crowd sourcing via GitHub\n#\n")
1275
write_data(
1276
finalfile,
1277
"# Date: " + time.strftime("%d %B %Y %H:%M:%S (%Z)", time.gmtime()) + "\n",
1278
)
1279
1280
if headerparams["extensions"]:
1281
if headerparams["nounifiedhosts"]:
1282
write_data(
1283
finalfile,
1284
"# The unified hosts file was not used while generating this file.\n"
1285
"# Extensions used to generate this file: "
1286
+ ", ".join(headerparams["extensions"])
1287
+ "\n",
1288
)
1289
else:
1290
write_data(
1291
finalfile,
1292
"# Extensions added to this file: "
1293
+ ", ".join(headerparams["extensions"])
1294
+ "\n",
1295
)
1296
1297
write_data(
1298
finalfile,
1299
(
1300
"# Number of unique domains: {:,}\n#\n".format(
1301
headerparams["numberofrules"]
1302
)
1303
),
1304
)
1305
write_data(
1306
finalfile,
1307
"# Fetch the latest version of this file: "
1308
"https://raw.githubusercontent.com/StevenBlack/hosts/master/"
1309
+ path_join_robust(headerparams["outputsubfolder"], "").replace("\\", "/")
1310
+ "hosts\n",
1311
)
1312
write_data(
1313
finalfile, "# Project home page: https://github.com/StevenBlack/hosts\n"
1314
)
1315
write_data(
1316
finalfile,
1317
"# Project releases: https://github.com/StevenBlack/hosts/releases\n#\n",
1318
)
1319
write_data(
1320
finalfile,
1321
"# ===============================================================\n",
1322
)
1323
write_data(finalfile, "\n")
1324
1325
if not headerparams["skipstatichosts"]:
1326
write_data(finalfile, "127.0.0.1 localhost\n")
1327
write_data(finalfile, "127.0.0.1 localhost.localdomain\n")
1328
write_data(finalfile, "127.0.0.1 local\n")
1329
write_data(finalfile, "255.255.255.255 broadcasthost\n")
1330
write_data(finalfile, "::1 localhost\n")
1331
write_data(finalfile, "::1 ip6-localhost\n")
1332
write_data(finalfile, "::1 ip6-loopback\n")
1333
write_data(finalfile, "fe80::1%lo0 localhost\n")
1334
write_data(finalfile, "ff00::0 ip6-localnet\n")
1335
write_data(finalfile, "ff00::0 ip6-mcastprefix\n")
1336
write_data(finalfile, "ff02::1 ip6-allnodes\n")
1337
write_data(finalfile, "ff02::2 ip6-allrouters\n")
1338
write_data(finalfile, "ff02::3 ip6-allhosts\n")
1339
write_data(finalfile, "0.0.0.0 0.0.0.0\n")
1340
1341
if platform.system() == "Linux":
1342
write_data(finalfile, "127.0.1.1 " + socket.gethostname() + "\n")
1343
write_data(finalfile, "127.0.0.53 " + socket.gethostname() + "\n")
1344
1345
write_data(finalfile, "\n")
1346
1347
preamble = path_join_robust(BASEDIR_PATH, "myhosts")
1348
maybe_copy_example_file(preamble)
1349
1350
if os.path.isfile(preamble):
1351
with open(preamble, "r") as f:
1352
write_data(finalfile, f.read())
1353
1354
finalfile.write(file_contents)
1355
1356
1357
def update_readme_data(readme_file, **readme_updates):
1358
"""
1359
Update the host and website information provided in the README JSON data.
1360
1361
Parameters
1362
----------
1363
readme_file : str
1364
The name of the README file to update.
1365
readme_updates : kwargs
1366
Dictionary providing additional JSON fields to update before
1367
saving the data. Currently, those fields are:
1368
1369
1) extensions
1370
2) sourcesdata
1371
3) numberofrules
1372
4) outputsubfolder
1373
5) nounifiedhosts
1374
"""
1375
1376
extensions_key = "base"
1377
extensions = readme_updates["extensions"]
1378
nounifiedhosts = readme_updates["nounifiedhosts"]
1379
1380
if extensions:
1381
extensions_key = "-".join(extensions)
1382
if nounifiedhosts:
1383
extensions_key = extensions_key + "-only"
1384
1385
output_folder = readme_updates["outputsubfolder"]
1386
generation_data = {
1387
"location": path_join_robust(output_folder, ""),
1388
"nounifiedhosts": nounifiedhosts,
1389
"entries": readme_updates["numberofrules"],
1390
"sourcesdata": readme_updates["sourcesdata"],
1391
}
1392
1393
with open(readme_file, "r") as f:
1394
readme_data = json.load(f)
1395
readme_data[extensions_key] = generation_data
1396
1397
for denomination, data in readme_data.copy().items():
1398
if "location" in data and data["location"] and "\\" in data["location"]:
1399
# Windows compatibility: #1166
1400
readme_data[denomination]["location"] = data["location"].replace("\\", "/")
1401
1402
with open(readme_file, "w") as f:
1403
json.dump(readme_data, f)
1404
1405
1406
def move_hosts_file_into_place(finalfile):
1407
r"""
1408
Move the newly-created hosts file into its correct location on the OS.
1409
1410
For UNIX systems, the hosts file is "etc/hosts." On Windows, it's
1411
"C:\Windows\System32\drivers\etc\hosts."
1412
1413
For this move to work, you must have administrator privileges to do this.
1414
On UNIX systems, this means having "sudo" access, and on Windows, it
1415
means being able to run command prompt in administrator mode.
1416
1417
Parameters
1418
----------
1419
finalfile : file object
1420
The newly-created hosts file to move.
1421
""" # noqa: W605
1422
1423
filename = os.path.abspath(finalfile.name)
1424
1425
try:
1426
if not Path(filename).exists():
1427
raise FileNotFoundError
1428
except Exception:
1429
print_failure(f"{filename} does not exist.")
1430
return False
1431
1432
if platform.system() == "Windows":
1433
target_file = str(
1434
Path(os.getenv("SystemRoot")) / "system32" / "drivers" / "etc" / "hosts"
1435
)
1436
else:
1437
target_file = "/etc/hosts"
1438
1439
if os.getenv("IN_CONTAINER"):
1440
# It's not allowed to remove/replace a mounted /etc/hosts, so we replace the content.
1441
# This requires running the container user as root, as is the default.
1442
print(f"Running in container, so we will replace the content of {target_file}.")
1443
try:
1444
with open(target_file, "w") as target_stream:
1445
with open(filename, "r") as source_stream:
1446
source = source_stream.read()
1447
target_stream.write(source)
1448
return True
1449
except Exception:
1450
print_failure(f"Replacing content of {target_file} failed.")
1451
return False
1452
elif (
1453
platform.system() == "Linux"
1454
or platform.system() == "Windows"
1455
or platform.system() == "Darwin"
1456
):
1457
print(
1458
f"Replacing {target_file} requires root privileges. You might need to enter your password."
1459
)
1460
try:
1461
subprocess.run(SUDO + ["cp", filename, target_file], check=True)
1462
return True
1463
except subprocess.CalledProcessError:
1464
print_failure(f"Replacing {target_file} failed.")
1465
return False
1466
1467
1468
def flush_dns_cache():
1469
"""
1470
Flush the DNS cache.
1471
"""
1472
1473
print("Flushing the DNS cache to utilize new hosts file...")
1474
print(
1475
"Flushing the DNS cache requires administrative privileges. You might need to enter your password."
1476
)
1477
1478
dns_cache_found = False
1479
1480
if platform.system() == "Darwin":
1481
if subprocess.call(SUDO + ["killall", "-HUP", "mDNSResponder"]):
1482
print_failure("Flushing the DNS cache failed.")
1483
elif os.name == "nt":
1484
print("Automatically flushing the DNS cache is not yet supported.")
1485
print(
1486
"Please copy and paste the command 'ipconfig /flushdns' in "
1487
"administrator command prompt after running this script."
1488
)
1489
else:
1490
nscd_prefixes = ["/etc", "/etc/rc.d"]
1491
nscd_msg = "Flushing the DNS cache by restarting nscd {result}"
1492
1493
for nscd_prefix in nscd_prefixes:
1494
nscd_cache = nscd_prefix + "/init.d/nscd"
1495
1496
if os.path.isfile(nscd_cache):
1497
dns_cache_found = True
1498
1499
if subprocess.call(SUDO + [nscd_cache, "restart"]):
1500
print_failure(nscd_msg.format(result="failed"))
1501
else:
1502
print_success(nscd_msg.format(result="succeeded"))
1503
1504
centos_file = "/etc/init.d/network"
1505
centos_msg = "Flushing the DNS cache by restarting network {result}"
1506
1507
if os.path.isfile(centos_file):
1508
if subprocess.call(SUDO + [centos_file, "restart"]):
1509
print_failure(centos_msg.format(result="failed"))
1510
else:
1511
print_success(centos_msg.format(result="succeeded"))
1512
1513
system_prefixes = ["/usr", ""]
1514
service_types = ["NetworkManager", "wicd", "dnsmasq", "networking"]
1515
restarted_services = []
1516
1517
for system_prefix in system_prefixes:
1518
systemctl = system_prefix + "/bin/systemctl"
1519
system_dir = system_prefix + "/lib/systemd/system"
1520
1521
for service_type in service_types:
1522
service = service_type + ".service"
1523
if service in restarted_services:
1524
continue
1525
1526
service_file = path_join_robust(system_dir, service)
1527
service_msg = (
1528
"Flushing the DNS cache by restarting " + service + " {result}"
1529
)
1530
1531
if os.path.isfile(service_file):
1532
if 0 != subprocess.call(
1533
[systemctl, "status", service], stdout=subprocess.DEVNULL
1534
):
1535
continue
1536
dns_cache_found = True
1537
1538
if subprocess.call(SUDO + [systemctl, "restart", service]):
1539
print_failure(service_msg.format(result="failed"))
1540
else:
1541
print_success(service_msg.format(result="succeeded"))
1542
restarted_services.append(service)
1543
1544
dns_clean_file = "/etc/init.d/dns-clean"
1545
dns_clean_msg = "Flushing the DNS cache via dns-clean executable {result}"
1546
1547
if os.path.isfile(dns_clean_file):
1548
dns_cache_found = True
1549
1550
if subprocess.call(SUDO + [dns_clean_file, "start"]):
1551
print_failure(dns_clean_msg.format(result="failed"))
1552
else:
1553
print_success(dns_clean_msg.format(result="succeeded"))
1554
1555
if not dns_cache_found:
1556
print_failure("Unable to determine DNS management tool.")
1557
1558
1559
def remove_old_hosts_file(path_to_file, file_name, backup):
1560
"""
1561
Remove the old hosts file.
1562
1563
This is a hotfix because merging with an already existing hosts file leads
1564
to artifacts and duplicates.
1565
1566
Parameters
1567
----------
1568
backup : boolean, default False
1569
Whether or not to backup the existing hosts file.
1570
"""
1571
1572
fullfilepath = path_join_robust(path_to_file, file_name)
1573
1574
if os.path.exists(fullfilepath):
1575
if backup:
1576
backupfilepath = fullfilepath + "-{}".format(
1577
time.strftime("%Y-%m-%d-%H-%M-%S")
1578
)
1579
1580
# Make a backup copy, marking the date in which the list was updated
1581
shutil.copy(fullfilepath, backupfilepath)
1582
1583
os.remove(fullfilepath)
1584
1585
# Create directory if not exists
1586
if not os.path.exists(path_to_file):
1587
os.makedirs(path_to_file)
1588
1589
# Create new empty hosts file
1590
open(fullfilepath, "a").close()
1591
1592
1593
# End File Logic
1594
1595
1596
def domain_to_idna(line):
1597
"""
1598
Encode a domain that is present into a line into `idna`. This way we
1599
avoid most encoding issues.
1600
1601
Parameters
1602
----------
1603
line : str
1604
The line we have to encode/decode.
1605
1606
Returns
1607
-------
1608
line : str
1609
The line in a converted format.
1610
1611
Notes
1612
-----
1613
- This function encodes only the domain to `idna` format because in
1614
most cases, the encoding issue is due to a domain which looks like
1615
`b'\xc9\xa2oogle.com'.decode('idna')`.
1616
- About the splitting:
1617
We split because we only want to encode the domain and not the full
1618
line, which may cause some issues. Keep in mind that we split, but we
1619
still concatenate once we encoded the domain.
1620
1621
- The following split the prefix `0.0.0.0` or `127.0.0.1` of a line.
1622
- The following also split the trailing comment of a given line.
1623
"""
1624
1625
if not line.startswith("#"):
1626
tabs = "\t"
1627
space = " "
1628
1629
tabsposition, spaceposition = (line.find(tabs), line.find(space))
1630
1631
if tabsposition > -1 and spaceposition > -1:
1632
if spaceposition < tabsposition:
1633
separator = space
1634
else:
1635
separator = tabs
1636
elif not tabsposition == -1:
1637
separator = tabs
1638
elif not spaceposition == -1:
1639
separator = space
1640
else:
1641
separator = ""
1642
1643
if separator:
1644
splited_line = line.split(separator)
1645
1646
try:
1647
index = 1
1648
while index < len(splited_line):
1649
if splited_line[index]:
1650
break
1651
index += 1
1652
1653
if "#" in splited_line[index]:
1654
index_comment = splited_line[index].find("#")
1655
1656
if index_comment > -1:
1657
comment = splited_line[index][index_comment:]
1658
1659
splited_line[index] = (
1660
splited_line[index]
1661
.split(comment)[0]
1662
.encode("IDNA")
1663
.decode("UTF-8")
1664
+ comment
1665
)
1666
1667
splited_line[index] = splited_line[index].encode("IDNA").decode("UTF-8")
1668
except IndexError:
1669
pass
1670
return separator.join(splited_line)
1671
return line.encode("IDNA").decode("UTF-8")
1672
return line.encode("UTF-8").decode("UTF-8")
1673
1674
1675
# Helper Functions
1676
def maybe_copy_example_file(file_path):
1677
"""
1678
Given a file path, copy over its ".example" if the path doesn't exist.
1679
1680
If the path does exist, nothing happens in this function.
1681
1682
If the path doesn't exist, and the ".example" file doesn't exist, nothing happens in this function.
1683
1684
Parameters
1685
----------
1686
file_path : str
1687
The full file path to check.
1688
"""
1689
1690
if not os.path.isfile(file_path):
1691
examplefilepath = file_path + ".example"
1692
if os.path.isfile(examplefilepath):
1693
shutil.copyfile(examplefilepath, file_path)
1694
1695
1696
def get_file_by_url(url, params=None, **kwargs):
1697
"""
1698
Retrieve the contents of the hosts file at the URL, then pass it through domain_to_idna().
1699
1700
Parameters are passed to the requests.get() function.
1701
1702
Parameters
1703
----------
1704
url : str or bytes
1705
URL for the new Request object.
1706
params :
1707
Dictionary, list of tuples or bytes to send in the query string for the Request.
1708
kwargs :
1709
Optional arguments that request takes.
1710
1711
Returns
1712
-------
1713
url_data : str or None
1714
The data retrieved at that URL from the file. Returns None if the
1715
attempted retrieval is unsuccessful.
1716
"""
1717
1718
try:
1719
req = requests.get(url=url, params=params, **kwargs)
1720
except requests.exceptions.RequestException:
1721
print("Error retrieving data from {}".format(url))
1722
return None
1723
1724
req.encoding = req.apparent_encoding
1725
res_text = "\n".join([domain_to_idna(line) for line in req.text.split("\n")])
1726
return res_text
1727
1728
1729
def write_data(f, data):
1730
"""
1731
Write data to a file object.
1732
1733
Parameters
1734
----------
1735
f : file
1736
The file object at which to write the data.
1737
data : str
1738
The data to write to the file.
1739
"""
1740
1741
f.write(bytes(data, "UTF-8"))
1742
1743
1744
def list_dir_no_hidden(path):
1745
"""
1746
List all files in a directory, except for hidden files.
1747
1748
Parameters
1749
----------
1750
path : str
1751
The path of the directory whose files we wish to list.
1752
"""
1753
1754
return glob(os.path.join(path, "*"))
1755
1756
1757
def query_yes_no(question, default="yes"):
1758
"""
1759
Ask a yes/no question via input() and get answer from the user.
1760
1761
Inspired by the following implementation:
1762
1763
https://code.activestate.com/recipes/577058/
1764
1765
Parameters
1766
----------
1767
question : str
1768
The question presented to the user.
1769
default : str, default "yes"
1770
The presumed answer if the user just hits <Enter>. It must be "yes",
1771
"no", or None (means an answer is required of the user).
1772
1773
Returns
1774
-------
1775
yes : Whether or not the user replied yes to the question.
1776
"""
1777
1778
valid = {"yes": "yes", "y": "yes", "ye": "yes", "no": "no", "n": "no"}
1779
prompt = {None: " [y/n] ", "yes": " [Y/n] ", "no": " [y/N] "}.get(default, None)
1780
1781
if not prompt:
1782
raise ValueError("invalid default answer: '%s'" % default)
1783
1784
reply = None
1785
1786
while not reply:
1787
sys.stdout.write(colorize(question, Colors.PROMPT) + prompt)
1788
1789
choice = input().lower()
1790
reply = None
1791
1792
if default and not choice:
1793
reply = default
1794
elif choice in valid:
1795
reply = valid[choice]
1796
else:
1797
print_failure("Please respond with 'yes' or 'no' (or 'y' or 'n').\n")
1798
1799
return reply == "yes"
1800
1801
1802
def is_valid_user_provided_domain_format(domain):
1803
"""
1804
Check whether a provided domain is valid.
1805
1806
Parameters
1807
----------
1808
domain : str
1809
The domain against which to check.
1810
1811
Returns
1812
-------
1813
valid_domain : bool
1814
Whether or not the domain provided is valid.
1815
"""
1816
1817
if domain == "":
1818
print("You didn't enter a domain. Try again.")
1819
return False
1820
1821
domain_regex = re.compile(r"www\d{0,3}[.]|https?")
1822
1823
if domain_regex.match(domain):
1824
print(
1825
"The domain " + domain + " is not valid. Do not include "
1826
"www.domain.com or http(s)://domain.com. Try again."
1827
)
1828
return False
1829
else:
1830
return True
1831
1832
1833
def recursive_glob(stem, file_pattern):
1834
"""
1835
Recursively match files in a directory according to a pattern.
1836
1837
Parameters
1838
----------
1839
stem : str
1840
The directory in which to recurse
1841
file_pattern : str
1842
The filename regex pattern to which to match.
1843
1844
Returns
1845
-------
1846
matches_list : list
1847
A list of filenames in the directory that match the file pattern.
1848
"""
1849
1850
if sys.version_info >= (3, 5):
1851
return glob(stem + "/**/" + file_pattern, recursive=True)
1852
else:
1853
# gh-316: this will avoid invalid unicode comparisons in Python 2.x
1854
if stem == str("*"):
1855
stem = "."
1856
matches = []
1857
for root, dirnames, filenames in os.walk(stem):
1858
for filename in fnmatch.filter(filenames, file_pattern):
1859
matches.append(path_join_robust(root, filename))
1860
return matches
1861
1862
1863
def path_join_robust(path, *paths):
1864
"""
1865
Wrapper around `os.path.join` with handling for locale issues.
1866
1867
Parameters
1868
----------
1869
path : str
1870
The first path to join.
1871
paths : varargs
1872
Subsequent path strings to join.
1873
1874
Returns
1875
-------
1876
joined_path : str
1877
The joined path string of the two path inputs.
1878
1879
Raises
1880
------
1881
locale.Error : A locale issue was detected that prevents path joining.
1882
"""
1883
1884
try:
1885
# gh-316: joining unicode and str can be saddening in Python 2.x
1886
path = str(path)
1887
paths = [str(another_path) for another_path in paths]
1888
1889
return os.path.join(path, *paths)
1890
except UnicodeDecodeError as e:
1891
raise locale.Error(
1892
"Unable to construct path. This is likely a LOCALE issue:\n\n" + str(e)
1893
)
1894
1895
1896
# Colors
1897
class Colors(object):
1898
PROMPT = "\033[94m"
1899
SUCCESS = "\033[92m"
1900
FAIL = "\033[91m"
1901
ENDC = "\033[0m"
1902
1903
1904
def supports_color():
1905
"""
1906
Check whether the running terminal or command prompt supports color.
1907
1908
Inspired by StackOverflow link (and Django implementation) here:
1909
1910
https://stackoverflow.com/questions/7445658
1911
1912
Returns
1913
-------
1914
colors_supported : bool
1915
Whether the running terminal or command prompt supports color.
1916
"""
1917
1918
sys_platform = sys.platform
1919
supported = sys_platform != "Pocket PC" and (
1920
sys_platform != "win32" or "ANSICON" in os.environ
1921
)
1922
1923
atty_connected = hasattr(sys.stdout, "isatty") and sys.stdout.isatty()
1924
return supported and atty_connected
1925
1926
1927
def colorize(text, color):
1928
"""
1929
Wrap a string so that it displays in a particular color.
1930
1931
This function adds a prefix and suffix to a text string so that it is
1932
displayed as a particular color, either in command prompt or the terminal.
1933
1934
If the running terminal or command prompt does not support color, the
1935
original text is returned without being wrapped.
1936
1937
Parameters
1938
----------
1939
text : str
1940
The message to display.
1941
color : str
1942
The color string prefix to put before the text.
1943
1944
Returns
1945
-------
1946
wrapped_str : str
1947
The wrapped string to display in color, if possible.
1948
"""
1949
1950
if not supports_color():
1951
return text
1952
1953
return color + text + Colors.ENDC
1954
1955
1956
def print_success(text):
1957
"""
1958
Print a success message.
1959
1960
Parameters
1961
----------
1962
text : str
1963
The message to display.
1964
"""
1965
1966
print(colorize(text, Colors.SUCCESS))
1967
1968
1969
def print_failure(text):
1970
"""
1971
Print a failure message.
1972
1973
Parameters
1974
----------
1975
text : str
1976
The message to display.
1977
"""
1978
1979
print(colorize(text, Colors.FAIL))
1980
1981
1982
# End Helper Functions
1983
1984
1985
if __name__ == "__main__":
1986
main()
1987
1988