Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
parkpow
GitHub Repository: parkpow/deep-license-plate-recognition
Path: blob/master/ftp_and_sftp_processor.py
640 views
1
#!/usr/bin/env python
2
import argparse
3
import ftplib
4
import json
5
import logging
6
import os
7
import sys
8
import tempfile
9
import time
10
from abc import ABC, abstractmethod
11
from collections.abc import Callable
12
from datetime import datetime, timedelta
13
from ftplib import FTP, error_perm, error_reply
14
from typing import Any
15
16
import paramiko
17
18
from plate_recognition import recognition_api, save_results
19
20
LOG_LEVEL = os.environ.get("LOGGING", "INFO").upper()
21
22
logging.basicConfig(
23
stream=sys.stdout,
24
level=LOG_LEVEL,
25
style="{",
26
format="{asctime} {levelname} {name} {threadName} : {message}",
27
)
28
29
lgr = logging.getLogger(__name__)
30
31
32
def get_files_and_dirs(
33
func: Callable[[Any, list, list, list], None]
34
) -> Callable[[Any], tuple[list, list, list]]:
35
def wrapper(self):
36
file_list = self.list_files()
37
dirs = []
38
nondirs = []
39
func(self, file_list, dirs, nondirs)
40
return file_list, dirs, nondirs
41
42
return wrapper
43
44
45
class FileTransferProcessor(ABC):
46
def __init__(self, **kwargs):
47
for key, value in kwargs.items():
48
setattr(self, key, value)
49
50
self.processed = []
51
52
@abstractmethod
53
def connect(self):
54
pass
55
56
@abstractmethod
57
def delete_file(self, file):
58
pass
59
60
@abstractmethod
61
def list_files(self):
62
pass
63
64
@abstractmethod
65
def set_ftp_binary_file(self):
66
pass
67
68
@abstractmethod
69
def set_working_directory(self, path):
70
pass
71
72
@abstractmethod
73
def get_working_directory(self):
74
pass
75
76
@abstractmethod
77
def retrieve_files(self):
78
pass
79
80
def processing_single_camera(self, args):
81
self.set_working_directory(args.folder)
82
83
file_list, dirs, nondirs = self.retrieve_files()
84
85
logging.info(
86
"Found %s file(s) in %s.", len(nondirs), self.get_working_directory()
87
)
88
89
# processing files
90
self.process_files(nondirs)
91
92
for folder in dirs:
93
folder_path = f"./{folder}"
94
self.camera_id = folder
95
self.set_working_directory(folder_path)
96
nondirs = []
97
file_list = self.list_files()
98
99
for info in file_list:
100
name = info[-1]
101
ls_type = info[0] if self.os_linux else info[-2]
102
if ls_type.startswith("d") or ls_type == "<DIR>":
103
# Don't process files any deeper
104
pass
105
else:
106
if self.os_linux:
107
nondirs.append(
108
[name, self.parse_date(info[-4], info[-3], info[-2])]
109
)
110
else:
111
file_date = info[0].split("-")
112
file_time = info[1]
113
114
if "AM" in file_time or "PM" in file_time:
115
parsed_time = datetime.strptime(file_time, "%I:%M%p")
116
file_time = parsed_time.strftime(
117
"%H:%M"
118
) # from AM/PM to 24-hour format
119
120
nondirs.append(
121
[
122
name,
123
self.parse_date(
124
file_date[0], file_date[1], file_time, linux=False
125
),
126
]
127
)
128
129
logging.info(
130
"Found %s file(s) in %s.", len(nondirs), self.get_working_directory()
131
)
132
self.process_files(nondirs)
133
134
def track_processed(self):
135
"""
136
Track processed is an interval specified
137
138
:param self: FileTransferProcessor properties context
139
:return: Boolean
140
"""
141
return self.interval and self.interval > 0 and not self.delete
142
143
def manage_processed_file(self, file, last_modified):
144
"""
145
Process a file path:
146
1. Deletes old file in ftp_files from ftp_client
147
148
:param file: file data path
149
:param last_modified: last modified datetime
150
"""
151
152
rm_older_than_date = datetime.now() - timedelta(seconds=self.delete)
153
if rm_older_than_date > last_modified:
154
result = self.delete_file(file)
155
if "error" in result.lower():
156
print(f"file couldn't be deleted: {result}")
157
else:
158
self.processed.remove(file)
159
160
def process_files(self, ftp_files):
161
results = []
162
for file_last_modified in ftp_files:
163
ftp_file = file_last_modified[0]
164
last_modified = file_last_modified[1]
165
166
if self.delete is not None:
167
self.manage_processed_file(ftp_file, last_modified)
168
continue
169
170
logging.info(ftp_file)
171
172
with tempfile.NamedTemporaryFile(
173
suffix="_" + ftp_file, mode="rb+"
174
) as image:
175
176
self.set_ftp_binary_file(ftp_file, image)
177
api_res = recognition_api(
178
image,
179
self.regions,
180
self.api_key,
181
self.sdk_url,
182
camera_id=self.camera_id,
183
timestamp=self.timestamp,
184
mmc=self.mmc,
185
exit_on_error=False,
186
)
187
results.append(api_res)
188
189
if self.track_processed():
190
self.processed.append(ftp_file)
191
192
if self.output_file:
193
save_results(results, self)
194
else:
195
print(json.dumps(results, indent=2))
196
197
def get_month_literal(self, month_number):
198
199
month_mapping = {
200
"01": "jan",
201
"02": "feb",
202
"03": "mar",
203
"04": "apr",
204
"05": "may",
205
"06": "jun",
206
"07": "jul",
207
"08": "aug",
208
"09": "sep",
209
"10": "oct",
210
"11": "nov",
211
"12": "dec",
212
}
213
214
month_literal = month_mapping.get(month_number.lower(), "unknown")
215
return month_literal
216
217
def parse_date(self, x, y, z, linux=True):
218
"""
219
M D T|Y
220
Jan 3 1994
221
Jan 17 1993
222
Sep 13 19:07
223
"""
224
225
if not linux:
226
x = self.get_month_literal(x)
227
228
date_string = f"{x} {int(y):02} {z}"
229
230
if ":" in z:
231
modify_year = True
232
parse_string = "%b %d %H:%M"
233
else:
234
modify_year = False
235
parse_string = "%b %d %Y"
236
237
logging.debug(f"Input Date String: {date_string}")
238
date_time_obj = datetime.strptime(date_string, parse_string)
239
if modify_year:
240
date_time_obj = date_time_obj.replace(year=datetime.now().year)
241
242
logging.debug(f"Parsed date: {date_time_obj}")
243
return date_time_obj
244
245
246
class FTPProcessor(FileTransferProcessor):
247
def __init__(self, **kwargs):
248
super().__init__(**kwargs)
249
self.ftp = None
250
self.os_linux = None
251
252
def connect(self):
253
self.ftp = FTP(timeout=120)
254
self.ftp.connect(self.hostname, self.port)
255
self.ftp.login(self.ftp_user, self.ftp_password)
256
logging.info(f"Connected to FTP server at {self.hostname}")
257
return self.ftp
258
259
def delete_file(self, file):
260
try:
261
response = self.ftp.delete(file)
262
return f"File {file} deleted. Server response: {response}"
263
except error_perm as e:
264
return f"Permission error: {e}"
265
except error_reply as e:
266
return f"Other FTP error: {e}"
267
except Exception as e:
268
return f"An unexpected error occurred: {e}"
269
270
def get_working_directory(self):
271
return self.ftp.pwd()
272
273
def set_working_directory(self, path):
274
try:
275
self.ftp.cwd(path)
276
except ftplib.error_perm as e:
277
print(f"Error 550: {e}")
278
279
def is_linux_os(self, file_list):
280
# check if OS is Linux or Windows
281
for info in file_list:
282
info_first_possition = info[0]
283
if info_first_possition.startswith("d") or info_first_possition.startswith(
284
"-"
285
):
286
return True
287
return False
288
289
def set_ftp_binary_file(self, file, image):
290
"""Retrieve a file in binary transfer mode
291
292
Args:
293
file (String): remote file path
294
image (TemporaryFile): image file destination
295
"""
296
self.ftp.retrbinary("RETR " + file, image.write)
297
298
def list_files(self):
299
file_list = []
300
self.ftp.retrlines("LIST", lambda x: file_list.append(x.split(maxsplit=8)))
301
return file_list
302
303
@get_files_and_dirs
304
def retrieve_files(self, file_list, dirs, nondirs):
305
self.os_linux = self.is_linux_os(file_list)
306
for info in file_list:
307
name = info[-1]
308
ls_type = info[0] if self.os_linux else info[-2]
309
if ls_type.startswith("d") or ls_type == "<DIR>":
310
dirs.append(name)
311
else:
312
if self.os_linux:
313
nondirs.append(
314
[name, self.parse_date(info[-4], info[-3], info[-2])]
315
)
316
else:
317
file_date = info[0].split("-")
318
file_time = info[1]
319
320
if "AM" in file_time or "PM" in file_time:
321
parsed_time = datetime.strptime(file_time, "%I:%M%p")
322
file_time = parsed_time.strftime(
323
"%H:%M"
324
) # from AM/PM to 24-hour format
325
326
nondirs.append(
327
[
328
name,
329
self.parse_date(
330
file_date[0], file_date[1], file_time, linux=False
331
),
332
]
333
)
334
335
336
class SFTPProcessor(FileTransferProcessor):
337
def __init__(self, **kwargs):
338
super().__init__(**kwargs)
339
self.sftp = None
340
self.os_linux = True
341
342
def connect(self):
343
try:
344
ssh = paramiko.SSHClient()
345
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
346
347
if self.ftp_password and not self.pkey:
348
print(self.hostname, self.port, self.ftp_user, self.ftp_password)
349
ssh.connect(
350
self.hostname,
351
port=self.port,
352
username=self.ftp_user,
353
password=self.ftp_password,
354
look_for_keys=False,
355
allow_agent=False,
356
)
357
358
else:
359
try:
360
key = paramiko.RSAKey.from_private_key_file(self.pkey)
361
ssh.connect(self.hostname, self.port, self.ftp_user, pkey=key)
362
except paramiko.AuthenticationException as e:
363
logging.error(f"Authentication failed: {e}")
364
raise
365
except Exception as e:
366
logging.error(f"An unexpected error occurred: {e}")
367
raise
368
369
self.sftp = ssh.open_sftp()
370
logging.info(f"Connected to SFTP server at {self.hostname}")
371
372
except paramiko.AuthenticationException:
373
logging.error("Authentication failed. Please check your credentials.")
374
except paramiko.SSHException as e:
375
logging.error(f"SSH connection error: {e}")
376
except Exception as e:
377
logging.error(f"An unexpected error occurred: {e}")
378
379
return self.sftp
380
381
def delete_file(self, file):
382
try:
383
self.sftp.remove(file)
384
return f"File {file} deleted."
385
except FileNotFoundError as e:
386
return f"File not found: {e}"
387
except OSError as e:
388
return f"An IOError occurred: {e}"
389
except Exception as e:
390
return f"An unexpected error occurred: {e}"
391
392
def get_working_directory(self):
393
return self.sftp.getcwd()
394
395
def set_working_directory(self, path):
396
try:
397
self.sftp.chdir(path)
398
except Exception as e:
399
print(f"Error changing working directory: {e}")
400
401
def set_ftp_binary_file(self, file, image):
402
"""Copy a remote file (remotepath) from the SFTP server and write to an open file or file-like object
403
404
Args:
405
file (String): remote file path
406
image (TemporaryFile): image file destination
407
"""
408
wd = self.get_working_directory()
409
self.sftp.getfo(wd + "/" + file, image)
410
411
def list_files(self):
412
413
file_list = []
414
415
try:
416
file_list_attr = self.sftp.listdir_attr()
417
418
for attr in file_list_attr:
419
file_list.append(str(attr).split())
420
421
for info in file_list:
422
# info format: ['-rw-------', '1', '0', '0', '175289', '16', 'Nov', '18:10', 'demo.jpg']
423
# adapting to linux standard: ['-rw-------', '1', '0', '0', '175289', 'Nov', '16', '18:10', 'demo.jpg']
424
tmp_month = info[-3]
425
info[-3] = info[-4]
426
info[-4] = tmp_month
427
428
except Exception as e:
429
print(f"Error listing files: {e}")
430
return
431
432
return file_list
433
434
@get_files_and_dirs
435
def retrieve_files(self, file_list, dirs, nondirs):
436
437
for info in file_list:
438
name = info[-1]
439
if info[0].startswith("d"):
440
dirs.append(name)
441
else:
442
nondirs.append([name, self.parse_date(info[-4], info[-3], info[-2])])
443
444
445
def parse_arguments(args_hook=lambda _: _):
446
parser = argparse.ArgumentParser(
447
description="Read license plates from the images on an FTP server and output the result as JSON or CSV",
448
epilog="""
449
Examples:\n
450
451
FTP:
452
---
453
Process images on an FTP server:
454
ftp_and_sftp_processor.py -a MY_API_KEY -H host -U user1 -P pass
455
Specify Camera ID and/or two Regions:
456
ftp_and_sftp_processor.py -a MY_API_KEY -H host -U user1 -P pass -f /home/user1 --camera-id Camera1 -r us-ca -r th-37
457
Use the Snapshot SDK instead of the Cloud Api:
458
ftp_and_sftp_processor.py -H host -U user1 -P pass -s http://localhost:8080
459
460
""",
461
formatter_class=argparse.RawTextHelpFormatter,
462
)
463
parser.add_argument("-a", "--api-key", help="Your API key.", required=False)
464
parser.add_argument(
465
"-r",
466
"--regions",
467
help="Match the license plate pattern fo specific region",
468
required=False,
469
action="append",
470
)
471
parser.add_argument(
472
"-s",
473
"--sdk-url",
474
help="Url to self hosted sdk For example, http://localhost:8080",
475
required=False,
476
)
477
parser.add_argument(
478
"--camera-id", help="Name of the source camera.", required=False
479
)
480
481
parser.add_argument(
482
"-c",
483
"--protocol",
484
help="protocol tu use, available choices 'ftp' or 'sftp'",
485
choices="ftp sftp".split(),
486
default="ftp",
487
required=False,
488
)
489
490
args_hook(parser)
491
args = parser.parse_args()
492
493
if not args.api_key:
494
raise Exception("api-key parameter is required")
495
496
return args
497
498
499
def custom_args(parser):
500
parser.epilog += """
501
502
Specify a folder on the FTP server:
503
ftp_and_sftp_processor.py -a MY_API_KEY -H 192.168.0.59 -U user1 -P pass -f /home/user1
504
Delete processed files from the FTP server after 10 seconds:
505
ftp_and_sftp_processor.py -a MY_API_KEY -H 192.168.0.59 -U user1 -P pass -f /home/user1 -d 10
506
Specify a folder containing dynamic cameras, Sub-folder names are Camera IDs:
507
ftp_and_sftp_processor.py -a MY_API_KEY -H 192.168.0.59 -U user1 -P pass --cameras-root /srv/cameras
508
Periodically check for new files every 10 seconds:
509
ftp_and_sftp_processor.py -a MY_API_KEY -H 192.168.0.59 -U user1 -P pass -f /home/user1 -i 10
510
Enable Make Model and Color prediction:
511
ftp_and_sftp_processor.py -a MY_API_KEY -H 192.168.0.59 -U user1 -P pass -f /home/user1 --mmc
512
Specify an output file and format for the results:
513
ftp_and_sftp_processor.py -a MY_API_KEY -H 192.168.0.59 -U user1 -P pass -f /home/user1 -o data.csv --format csv
514
515
SFTP:
516
----
517
ftp_password login, Process images in /tmp/images:
518
ftp_and_sftp_processor.py -c sftp -U usr1 -P pass -H 192.168.0.59 -f /tmp/images -a 4805bee#########\n
519
Private Key login, Process images in /tmp/images:
520
ftp_and_sftp_processor.py -c sftp -U usr1 --pkey '/home/danleyb2/.ssh/id_rsa' -H 192.168.0.59 -f /tmp/images -a 4805bee#########\n
521
ftp_password login, Process images in /tmp/images using Snapshot SDK:
522
ftp_and_sftp_processor.py -c sftp -U usr1 -P pass -H 192.168.0.59 -f /tmp/images -s http://localhost:8080\n
523
Process images in /tmp/images Periodically every 5 seconds:
524
ftp_and_sftp_processor.py -c sftp -U usr1 -P pass -H 192.168.0.59 -f /tmp/images -a 4805bee######### -i 5
525
"""
526
parser.add_argument("-t", "--timestamp", help="Timestamp.", required=False)
527
parser.add_argument("-H", "--hostname", help="host", required=True)
528
parser.add_argument("-p", "--port", help="port", required=False)
529
parser.add_argument(
530
"-U", "--ftp-user", help="Transfer protocol server user", required=True
531
)
532
parser.add_argument(
533
"-P",
534
"--ftp-password",
535
help="Transfer protocol server user's password",
536
required=False,
537
)
538
parser.add_argument("--pkey", help="SFTP Private Key Path", required=False)
539
parser.add_argument(
540
"-d",
541
"--delete",
542
type=int,
543
help="Remove images from the FTP server after processing. Optionally specify a timeout in seconds.",
544
nargs="?",
545
const=0,
546
)
547
parser.add_argument(
548
"-f",
549
"--folder",
550
help="Specify folder with images on the FTP server.",
551
default="/",
552
)
553
parser.add_argument(
554
"--cameras-root", help="Root folder containing dynamic cameras", required=False
555
)
556
parser.add_argument("-o", "--output-file", help="Save result to file.")
557
parser.add_argument(
558
"--format",
559
help="Format of the result.",
560
default="json",
561
choices="json csv".split(),
562
)
563
parser.add_argument(
564
"--mmc",
565
action="store_true",
566
help="Predict vehicle make and model (SDK only). It has to be enabled.",
567
)
568
parser.add_argument(
569
"-i",
570
"--interval",
571
type=int,
572
help="Periodically fetch new images from the server every interval seconds.",
573
)
574
575
def default_port():
576
return 21 if parser.parse_args().protocol == "ftp" else 22
577
578
parser.set_defaults(port=default_port())
579
580
581
def ftp_process(args):
582
583
args_dict = vars(args)
584
585
if args.protocol == "ftp":
586
file_processor = FTPProcessor(**args_dict)
587
else:
588
if not args.ftp_password and not args.pkey:
589
raise Exception("ftp_password or pkey path are required")
590
file_processor = SFTPProcessor(**args_dict)
591
592
"""
593
for attr, value in file_processor.__dict__.items():
594
print(f"{attr}: {value}")
595
"""
596
597
file_processor.connect()
598
599
if args.cameras_root:
600
file_processor.set_working_directory(args.cameras_root)
601
file_list, dirs, nondirs = file_processor.retrieve_files()
602
for folder in dirs:
603
logging.info(
604
f"Processing Dynamic Camera : {file_processor.get_working_directory()}"
605
)
606
args.folder = os.path.join(args.cameras_root, folder)
607
# The camera id is the folder name
608
args.camera_id = folder
609
610
file_processor.processing_single_camera(args)
611
else:
612
file_processor.processing_single_camera(args)
613
614
615
def main():
616
args = parse_arguments(custom_args)
617
618
if args.interval and args.interval > 0:
619
while True:
620
try:
621
ftp_process(args)
622
except Exception as e:
623
print(f"ERROR: {e}")
624
time.sleep(args.interval)
625
else:
626
ftp_process(args)
627
628
629
if __name__ == "__main__":
630
main()
631
632