Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
parkpow
GitHub Repository: parkpow/deep-license-plate-recognition
Path: blob/master/ftp_and_sftp_processor.py
1089 views
1
#!/usr/bin/env python
2
import argparse
3
import ftplib
4
import json
5
import logging
6
import os
7
import sys
8
import tempfile
9
import time
10
from abc import ABC, abstractmethod
11
from collections.abc import Callable
12
from datetime import datetime, timedelta
13
from ftplib import FTP, error_perm, error_reply
14
from typing import Any
15
16
import paramiko
17
18
from plate_recognition import recognition_api, save_results
19
20
LOG_LEVEL = os.environ.get("LOGGING", "INFO").upper()
21
22
logging.basicConfig(
23
stream=sys.stdout,
24
level=LOG_LEVEL,
25
style="{",
26
format="{asctime} {levelname} {name} {threadName} : {message}",
27
)
28
29
lgr = logging.getLogger(__name__)
30
31
32
def get_files_and_dirs(
33
func: Callable[[Any, list, list, list], None],
34
) -> Callable[[Any], tuple[list, list, list]]:
35
def wrapper(self):
36
file_list = self.list_files()
37
dirs = []
38
nondirs = []
39
func(self, file_list, dirs, nondirs)
40
return file_list, dirs, nondirs
41
42
return wrapper
43
44
45
class FileTransferProcessor(ABC):
46
def __init__(self, **kwargs):
47
for key, value in kwargs.items():
48
setattr(self, key, value)
49
50
self.processed = []
51
52
@abstractmethod
53
def connect(self):
54
pass
55
56
@abstractmethod
57
def delete_file(self, file):
58
pass
59
60
@abstractmethod
61
def list_files(self):
62
pass
63
64
@abstractmethod
65
def set_ftp_binary_file(self):
66
pass
67
68
@abstractmethod
69
def set_working_directory(self, path):
70
pass
71
72
@abstractmethod
73
def get_working_directory(self):
74
pass
75
76
@abstractmethod
77
def retrieve_files(self):
78
pass
79
80
def processing_single_camera(self, args):
81
self.set_working_directory(args.folder)
82
83
file_list, dirs, nondirs = self.retrieve_files()
84
85
logging.info(
86
"Found %s file(s) in %s.", len(nondirs), self.get_working_directory()
87
)
88
89
# processing files
90
self.process_files(nondirs)
91
92
for folder in dirs:
93
folder_path = f"./{folder}"
94
self.camera_id = folder
95
self.set_working_directory(folder_path)
96
nondirs = []
97
file_list = self.list_files()
98
99
for info in file_list:
100
name = info[-1]
101
ls_type = info[0] if self.os_linux else info[-2]
102
if ls_type.startswith("d") or ls_type == "<DIR>":
103
# Don't process files any deeper
104
pass
105
else:
106
if self.os_linux:
107
nondirs.append(
108
[name, self.parse_date(info[-4], info[-3], info[-2])]
109
)
110
else:
111
file_date = info[0].split("-")
112
file_time = info[1]
113
114
if "AM" in file_time or "PM" in file_time:
115
parsed_time = datetime.strptime(file_time, "%I:%M%p")
116
file_time = parsed_time.strftime(
117
"%H:%M"
118
) # from AM/PM to 24-hour format
119
120
nondirs.append(
121
[
122
name,
123
self.parse_date(
124
file_date[0], file_date[1], file_time, linux=False
125
),
126
]
127
)
128
129
logging.info(
130
"Found %s file(s) in %s.", len(nondirs), self.get_working_directory()
131
)
132
self.process_files(nondirs)
133
134
def track_processed(self):
135
"""
136
Track processed is an interval specified
137
138
:param self: FileTransferProcessor properties context
139
:return: Boolean
140
"""
141
return self.interval and self.interval > 0 and not self.delete
142
143
def manage_processed_file(self, file, last_modified):
144
"""
145
Process a file path:
146
1. Deletes old file in ftp_files from ftp_client
147
148
:param file: file data path
149
:param last_modified: last modified datetime
150
"""
151
152
rm_older_than_date = datetime.now() - timedelta(seconds=self.delete)
153
if rm_older_than_date > last_modified:
154
result = self.delete_file(file)
155
if "error" in result.lower():
156
print(f"file couldn't be deleted: {result}")
157
else:
158
self.processed.remove(file)
159
160
def process_files(self, ftp_files):
161
results = []
162
for file_last_modified in ftp_files:
163
ftp_file = file_last_modified[0]
164
last_modified = file_last_modified[1]
165
166
if self.delete is not None:
167
self.manage_processed_file(ftp_file, last_modified)
168
continue
169
170
logging.info(ftp_file)
171
172
with tempfile.NamedTemporaryFile(
173
suffix="_" + ftp_file, mode="rb+"
174
) as image:
175
self.set_ftp_binary_file(ftp_file, image)
176
api_res = recognition_api(
177
image,
178
self.regions,
179
self.api_key,
180
self.sdk_url,
181
camera_id=self.camera_id,
182
timestamp=self.timestamp,
183
mmc=self.mmc,
184
exit_on_error=False,
185
)
186
results.append(api_res)
187
188
if self.track_processed():
189
self.processed.append(ftp_file)
190
191
if self.output_file:
192
save_results(results, self)
193
else:
194
print(json.dumps(results, indent=2))
195
196
def get_month_literal(self, month_number):
197
month_mapping = {
198
"01": "jan",
199
"02": "feb",
200
"03": "mar",
201
"04": "apr",
202
"05": "may",
203
"06": "jun",
204
"07": "jul",
205
"08": "aug",
206
"09": "sep",
207
"10": "oct",
208
"11": "nov",
209
"12": "dec",
210
}
211
212
month_literal = month_mapping.get(month_number.lower(), "unknown")
213
return month_literal
214
215
def parse_date(self, x, y, z, linux=True):
216
"""
217
M D T|Y
218
Jan 3 1994
219
Jan 17 1993
220
Sep 13 19:07
221
"""
222
223
if not linux:
224
x = self.get_month_literal(x)
225
226
date_string = f"{x} {int(y):02} {z}"
227
228
if ":" in z:
229
modify_year = True
230
parse_string = "%b %d %H:%M"
231
else:
232
modify_year = False
233
parse_string = "%b %d %Y"
234
235
logging.debug(f"Input Date String: {date_string}")
236
date_time_obj = datetime.strptime(date_string, parse_string)
237
if modify_year:
238
date_time_obj = date_time_obj.replace(year=datetime.now().year)
239
240
logging.debug(f"Parsed date: {date_time_obj}")
241
return date_time_obj
242
243
244
class FTPProcessor(FileTransferProcessor):
245
def __init__(self, **kwargs):
246
super().__init__(**kwargs)
247
self.ftp = None
248
self.os_linux = None
249
250
def connect(self):
251
self.ftp = FTP(timeout=120)
252
self.ftp.connect(self.hostname, self.port)
253
self.ftp.login(self.ftp_user, self.ftp_password)
254
logging.info(f"Connected to FTP server at {self.hostname}")
255
return self.ftp
256
257
def delete_file(self, file):
258
try:
259
response = self.ftp.delete(file)
260
return f"File {file} deleted. Server response: {response}"
261
except error_perm as e:
262
return f"Permission error: {e}"
263
except error_reply as e:
264
return f"Other FTP error: {e}"
265
except Exception as e:
266
return f"An unexpected error occurred: {e}"
267
268
def get_working_directory(self):
269
return self.ftp.pwd()
270
271
def set_working_directory(self, path):
272
try:
273
self.ftp.cwd(path)
274
except ftplib.error_perm as e:
275
print(f"Error 550: {e}")
276
277
def is_linux_os(self, file_list):
278
# check if OS is Linux or Windows
279
for info in file_list:
280
info_first_possition = info[0]
281
return bool(
282
info_first_possition.startswith("d")
283
or info_first_possition.startswith("-")
284
)
285
286
def set_ftp_binary_file(self, file, image):
287
"""Retrieve a file in binary transfer mode
288
289
Args:
290
file (String): remote file path
291
image (TemporaryFile): image file destination
292
"""
293
self.ftp.retrbinary("RETR " + file, image.write)
294
295
def list_files(self):
296
file_list = []
297
self.ftp.retrlines("LIST", lambda x: file_list.append(x.split(maxsplit=8)))
298
return file_list
299
300
@get_files_and_dirs
301
def retrieve_files(self, file_list, dirs, nondirs):
302
self.os_linux = self.is_linux_os(file_list)
303
for info in file_list:
304
name = info[-1]
305
ls_type = info[0] if self.os_linux else info[-2]
306
if ls_type.startswith("d") or ls_type == "<DIR>":
307
dirs.append(name)
308
else:
309
if self.os_linux:
310
nondirs.append(
311
[name, self.parse_date(info[-4], info[-3], info[-2])]
312
)
313
else:
314
file_date = info[0].split("-")
315
file_time = info[1]
316
317
if "AM" in file_time or "PM" in file_time:
318
parsed_time = datetime.strptime(file_time, "%I:%M%p")
319
file_time = parsed_time.strftime(
320
"%H:%M"
321
) # from AM/PM to 24-hour format
322
323
nondirs.append(
324
[
325
name,
326
self.parse_date(
327
file_date[0], file_date[1], file_time, linux=False
328
),
329
]
330
)
331
332
333
class SFTPProcessor(FileTransferProcessor):
334
def __init__(self, **kwargs):
335
super().__init__(**kwargs)
336
self.sftp = None
337
self.os_linux = True
338
339
def connect(self):
340
try:
341
ssh = paramiko.SSHClient()
342
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
343
344
if self.ftp_password and not self.pkey:
345
print(self.hostname, self.port, self.ftp_user, self.ftp_password)
346
ssh.connect(
347
self.hostname,
348
port=self.port,
349
username=self.ftp_user,
350
password=self.ftp_password,
351
look_for_keys=False,
352
allow_agent=False,
353
)
354
355
else:
356
try:
357
key = paramiko.RSAKey.from_private_key_file(self.pkey)
358
ssh.connect(self.hostname, self.port, self.ftp_user, pkey=key)
359
except paramiko.AuthenticationException as e:
360
logging.error(f"Authentication failed: {e}")
361
raise
362
except Exception as e:
363
logging.error(f"An unexpected error occurred: {e}")
364
raise
365
366
self.sftp = ssh.open_sftp()
367
logging.info(f"Connected to SFTP server at {self.hostname}")
368
369
except paramiko.AuthenticationException:
370
logging.error("Authentication failed. Please check your credentials.")
371
except paramiko.SSHException as e:
372
logging.error(f"SSH connection error: {e}")
373
except Exception as e:
374
logging.error(f"An unexpected error occurred: {e}")
375
376
return self.sftp
377
378
def delete_file(self, file):
379
try:
380
self.sftp.remove(file)
381
return f"File {file} deleted."
382
except FileNotFoundError as e:
383
return f"File not found: {e}"
384
except OSError as e:
385
return f"An IOError occurred: {e}"
386
except Exception as e:
387
return f"An unexpected error occurred: {e}"
388
389
def get_working_directory(self):
390
return self.sftp.getcwd()
391
392
def set_working_directory(self, path):
393
try:
394
self.sftp.chdir(path)
395
except Exception as e:
396
print(f"Error changing working directory: {e}")
397
398
def set_ftp_binary_file(self, file, image):
399
"""Copy a remote file (remotepath) from the SFTP server and write to an open file or file-like object
400
401
Args:
402
file (String): remote file path
403
image (TemporaryFile): image file destination
404
"""
405
wd = self.get_working_directory()
406
self.sftp.getfo(wd + "/" + file, image)
407
408
def list_files(self):
409
file_list = []
410
411
try:
412
file_list_attr = self.sftp.listdir_attr()
413
414
for attr in file_list_attr:
415
file_list.append(str(attr).split())
416
417
for info in file_list:
418
# info format: ['-rw-------', '1', '0', '0', '175289', '16', 'Nov', '18:10', 'demo.jpg']
419
# adapting to linux standard: ['-rw-------', '1', '0', '0', '175289', 'Nov', '16', '18:10', 'demo.jpg']
420
tmp_month = info[-3]
421
info[-3] = info[-4]
422
info[-4] = tmp_month
423
424
except Exception as e:
425
print(f"Error listing files: {e}")
426
return
427
428
return file_list
429
430
@get_files_and_dirs
431
def retrieve_files(self, file_list, dirs, nondirs):
432
for info in file_list:
433
name = info[-1]
434
if info[0].startswith("d"):
435
dirs.append(name)
436
else:
437
nondirs.append([name, self.parse_date(info[-4], info[-3], info[-2])])
438
439
440
def parse_arguments(args_hook=lambda _: _):
441
parser = argparse.ArgumentParser(
442
description="Read license plates from the images on an FTP server and output the result as JSON or CSV",
443
epilog="""
444
Examples:\n
445
446
FTP:
447
---
448
Process images on an FTP server:
449
ftp_and_sftp_processor.py -a MY_API_KEY -H host -U user1 -P pass
450
Specify Camera ID and/or two Regions:
451
ftp_and_sftp_processor.py -a MY_API_KEY -H host -U user1 -P pass -f /home/user1 --camera-id Camera1 -r us-ca -r th-37
452
Use the Snapshot SDK instead of the Cloud Api:
453
ftp_and_sftp_processor.py -H host -U user1 -P pass -s http://localhost:8080
454
455
""",
456
formatter_class=argparse.RawTextHelpFormatter,
457
)
458
parser.add_argument("-a", "--api-key", help="Your API key.", required=False)
459
parser.add_argument(
460
"-r",
461
"--regions",
462
help="Match the license plate pattern fo specific region",
463
required=False,
464
action="append",
465
)
466
parser.add_argument(
467
"-s",
468
"--sdk-url",
469
help="Url to self hosted sdk For example, http://localhost:8080",
470
required=False,
471
)
472
parser.add_argument(
473
"--camera-id", help="Name of the source camera.", required=False
474
)
475
476
parser.add_argument(
477
"-c",
478
"--protocol",
479
help="protocol tu use, available choices 'ftp' or 'sftp'",
480
choices="ftp sftp".split(),
481
default="ftp",
482
required=False,
483
)
484
485
args_hook(parser)
486
args = parser.parse_args()
487
488
if not args.api_key:
489
raise Exception("api-key parameter is required")
490
491
return args
492
493
494
def custom_args(parser):
495
parser.epilog += """
496
497
Specify a folder on the FTP server:
498
ftp_and_sftp_processor.py -a MY_API_KEY -H 192.168.0.59 -U user1 -P pass -f /home/user1
499
Delete processed files from the FTP server after 10 seconds:
500
ftp_and_sftp_processor.py -a MY_API_KEY -H 192.168.0.59 -U user1 -P pass -f /home/user1 -d 10
501
Specify a folder containing dynamic cameras, Sub-folder names are Camera IDs:
502
ftp_and_sftp_processor.py -a MY_API_KEY -H 192.168.0.59 -U user1 -P pass --cameras-root /srv/cameras
503
Periodically check for new files every 10 seconds:
504
ftp_and_sftp_processor.py -a MY_API_KEY -H 192.168.0.59 -U user1 -P pass -f /home/user1 -i 10
505
Enable Make Model and Color prediction:
506
ftp_and_sftp_processor.py -a MY_API_KEY -H 192.168.0.59 -U user1 -P pass -f /home/user1 --mmc
507
Specify an output file and format for the results:
508
ftp_and_sftp_processor.py -a MY_API_KEY -H 192.168.0.59 -U user1 -P pass -f /home/user1 -o data.csv --format csv
509
510
SFTP:
511
----
512
ftp_password login, Process images in /tmp/images:
513
ftp_and_sftp_processor.py -c sftp -U usr1 -P pass -H 192.168.0.59 -f /tmp/images -a 4805bee#########\n
514
Private Key login, Process images in /tmp/images:
515
ftp_and_sftp_processor.py -c sftp -U usr1 --pkey '/home/danleyb2/.ssh/id_rsa' -H 192.168.0.59 -f /tmp/images -a 4805bee#########\n
516
ftp_password login, Process images in /tmp/images using Snapshot SDK:
517
ftp_and_sftp_processor.py -c sftp -U usr1 -P pass -H 192.168.0.59 -f /tmp/images -s http://localhost:8080\n
518
Process images in /tmp/images Periodically every 5 seconds:
519
ftp_and_sftp_processor.py -c sftp -U usr1 -P pass -H 192.168.0.59 -f /tmp/images -a 4805bee######### -i 5
520
"""
521
parser.add_argument("-t", "--timestamp", help="Timestamp.", required=False)
522
parser.add_argument("-H", "--hostname", help="host", required=True)
523
parser.add_argument("-p", "--port", help="port", required=False)
524
parser.add_argument(
525
"-U", "--ftp-user", help="Transfer protocol server user", required=True
526
)
527
parser.add_argument(
528
"-P",
529
"--ftp-password",
530
help="Transfer protocol server user's password",
531
required=False,
532
)
533
parser.add_argument("--pkey", help="SFTP Private Key Path", required=False)
534
parser.add_argument(
535
"-d",
536
"--delete",
537
type=int,
538
help="Remove images from the FTP server after processing. Optionally specify a timeout in seconds.",
539
nargs="?",
540
const=0,
541
)
542
parser.add_argument(
543
"-f",
544
"--folder",
545
help="Specify folder with images on the FTP server.",
546
default="/",
547
)
548
parser.add_argument(
549
"--cameras-root", help="Root folder containing dynamic cameras", required=False
550
)
551
parser.add_argument("-o", "--output-file", help="Save result to file.")
552
parser.add_argument(
553
"--format",
554
help="Format of the result.",
555
default="json",
556
choices="json csv".split(),
557
)
558
parser.add_argument(
559
"--mmc",
560
action="store_true",
561
help="Predict vehicle make and model (SDK only). It has to be enabled.",
562
)
563
parser.add_argument(
564
"-i",
565
"--interval",
566
type=int,
567
help="Periodically fetch new images from the server every interval seconds.",
568
)
569
570
def default_port():
571
return 21 if parser.parse_args().protocol == "ftp" else 22
572
573
parser.set_defaults(port=default_port())
574
575
576
def ftp_process(args):
577
args_dict = vars(args)
578
579
if args.protocol == "ftp":
580
file_processor = FTPProcessor(**args_dict)
581
else:
582
if not args.ftp_password and not args.pkey:
583
raise Exception("ftp_password or pkey path are required")
584
file_processor = SFTPProcessor(**args_dict)
585
586
"""
587
for attr, value in file_processor.__dict__.items():
588
print(f"{attr}: {value}")
589
"""
590
591
file_processor.connect()
592
593
if args.cameras_root:
594
file_processor.set_working_directory(args.cameras_root)
595
file_list, dirs, nondirs = file_processor.retrieve_files()
596
for folder in dirs:
597
logging.info(
598
f"Processing Dynamic Camera : {file_processor.get_working_directory()}"
599
)
600
args.folder = os.path.join(args.cameras_root, folder)
601
# The camera id is the folder name
602
args.camera_id = folder
603
604
file_processor.processing_single_camera(args)
605
else:
606
file_processor.processing_single_camera(args)
607
608
609
def main():
610
args = parse_arguments(custom_args)
611
612
if args.interval and args.interval > 0:
613
while True:
614
try:
615
ftp_process(args)
616
except Exception as e:
617
print(f"ERROR: {e}")
618
time.sleep(args.interval)
619
else:
620
ftp_process(args)
621
622
623
if __name__ == "__main__":
624
main()
625
626