Path: blob/master/ftp_and_sftp_processor.py
1089 views
#!/usr/bin/env python1import argparse2import ftplib3import json4import logging5import os6import sys7import tempfile8import time9from abc import ABC, abstractmethod10from collections.abc import Callable11from datetime import datetime, timedelta12from ftplib import FTP, error_perm, error_reply13from typing import Any1415import paramiko1617from plate_recognition import recognition_api, save_results1819LOG_LEVEL = os.environ.get("LOGGING", "INFO").upper()2021logging.basicConfig(22stream=sys.stdout,23level=LOG_LEVEL,24style="{",25format="{asctime} {levelname} {name} {threadName} : {message}",26)2728lgr = logging.getLogger(__name__)293031def get_files_and_dirs(32func: Callable[[Any, list, list, list], None],33) -> Callable[[Any], tuple[list, list, list]]:34def wrapper(self):35file_list = self.list_files()36dirs = []37nondirs = []38func(self, file_list, dirs, nondirs)39return file_list, dirs, nondirs4041return wrapper424344class FileTransferProcessor(ABC):45def __init__(self, **kwargs):46for key, value in kwargs.items():47setattr(self, key, value)4849self.processed = []5051@abstractmethod52def connect(self):53pass5455@abstractmethod56def delete_file(self, file):57pass5859@abstractmethod60def list_files(self):61pass6263@abstractmethod64def set_ftp_binary_file(self):65pass6667@abstractmethod68def set_working_directory(self, path):69pass7071@abstractmethod72def get_working_directory(self):73pass7475@abstractmethod76def retrieve_files(self):77pass7879def processing_single_camera(self, args):80self.set_working_directory(args.folder)8182file_list, dirs, nondirs = self.retrieve_files()8384logging.info(85"Found %s file(s) in %s.", len(nondirs), self.get_working_directory()86)8788# processing files89self.process_files(nondirs)9091for folder in dirs:92folder_path = f"./{folder}"93self.camera_id = folder94self.set_working_directory(folder_path)95nondirs = []96file_list = self.list_files()9798for info in file_list:99name = info[-1]100ls_type = info[0] if self.os_linux else info[-2]101if ls_type.startswith("d") or ls_type == "<DIR>":102# Don't process files any deeper103pass104else:105if self.os_linux:106nondirs.append(107[name, self.parse_date(info[-4], info[-3], info[-2])]108)109else:110file_date = info[0].split("-")111file_time = info[1]112113if "AM" in file_time or "PM" in file_time:114parsed_time = datetime.strptime(file_time, "%I:%M%p")115file_time = parsed_time.strftime(116"%H:%M"117) # from AM/PM to 24-hour format118119nondirs.append(120[121name,122self.parse_date(123file_date[0], file_date[1], file_time, linux=False124),125]126)127128logging.info(129"Found %s file(s) in %s.", len(nondirs), self.get_working_directory()130)131self.process_files(nondirs)132133def track_processed(self):134"""135Track processed is an interval specified136137:param self: FileTransferProcessor properties context138:return: Boolean139"""140return self.interval and self.interval > 0 and not self.delete141142def manage_processed_file(self, file, last_modified):143"""144Process a file path:1451. Deletes old file in ftp_files from ftp_client146147:param file: file data path148:param last_modified: last modified datetime149"""150151rm_older_than_date = datetime.now() - timedelta(seconds=self.delete)152if rm_older_than_date > last_modified:153result = self.delete_file(file)154if "error" in result.lower():155print(f"file couldn't be deleted: {result}")156else:157self.processed.remove(file)158159def process_files(self, ftp_files):160results = []161for file_last_modified in ftp_files:162ftp_file = file_last_modified[0]163last_modified = file_last_modified[1]164165if self.delete is not None:166self.manage_processed_file(ftp_file, last_modified)167continue168169logging.info(ftp_file)170171with tempfile.NamedTemporaryFile(172suffix="_" + ftp_file, mode="rb+"173) as image:174self.set_ftp_binary_file(ftp_file, image)175api_res = recognition_api(176image,177self.regions,178self.api_key,179self.sdk_url,180camera_id=self.camera_id,181timestamp=self.timestamp,182mmc=self.mmc,183exit_on_error=False,184)185results.append(api_res)186187if self.track_processed():188self.processed.append(ftp_file)189190if self.output_file:191save_results(results, self)192else:193print(json.dumps(results, indent=2))194195def get_month_literal(self, month_number):196month_mapping = {197"01": "jan",198"02": "feb",199"03": "mar",200"04": "apr",201"05": "may",202"06": "jun",203"07": "jul",204"08": "aug",205"09": "sep",206"10": "oct",207"11": "nov",208"12": "dec",209}210211month_literal = month_mapping.get(month_number.lower(), "unknown")212return month_literal213214def parse_date(self, x, y, z, linux=True):215"""216M D T|Y217Jan 3 1994218Jan 17 1993219Sep 13 19:07220"""221222if not linux:223x = self.get_month_literal(x)224225date_string = f"{x} {int(y):02} {z}"226227if ":" in z:228modify_year = True229parse_string = "%b %d %H:%M"230else:231modify_year = False232parse_string = "%b %d %Y"233234logging.debug(f"Input Date String: {date_string}")235date_time_obj = datetime.strptime(date_string, parse_string)236if modify_year:237date_time_obj = date_time_obj.replace(year=datetime.now().year)238239logging.debug(f"Parsed date: {date_time_obj}")240return date_time_obj241242243class FTPProcessor(FileTransferProcessor):244def __init__(self, **kwargs):245super().__init__(**kwargs)246self.ftp = None247self.os_linux = None248249def connect(self):250self.ftp = FTP(timeout=120)251self.ftp.connect(self.hostname, self.port)252self.ftp.login(self.ftp_user, self.ftp_password)253logging.info(f"Connected to FTP server at {self.hostname}")254return self.ftp255256def delete_file(self, file):257try:258response = self.ftp.delete(file)259return f"File {file} deleted. Server response: {response}"260except error_perm as e:261return f"Permission error: {e}"262except error_reply as e:263return f"Other FTP error: {e}"264except Exception as e:265return f"An unexpected error occurred: {e}"266267def get_working_directory(self):268return self.ftp.pwd()269270def set_working_directory(self, path):271try:272self.ftp.cwd(path)273except ftplib.error_perm as e:274print(f"Error 550: {e}")275276def is_linux_os(self, file_list):277# check if OS is Linux or Windows278for info in file_list:279info_first_possition = info[0]280return bool(281info_first_possition.startswith("d")282or info_first_possition.startswith("-")283)284285def set_ftp_binary_file(self, file, image):286"""Retrieve a file in binary transfer mode287288Args:289file (String): remote file path290image (TemporaryFile): image file destination291"""292self.ftp.retrbinary("RETR " + file, image.write)293294def list_files(self):295file_list = []296self.ftp.retrlines("LIST", lambda x: file_list.append(x.split(maxsplit=8)))297return file_list298299@get_files_and_dirs300def retrieve_files(self, file_list, dirs, nondirs):301self.os_linux = self.is_linux_os(file_list)302for info in file_list:303name = info[-1]304ls_type = info[0] if self.os_linux else info[-2]305if ls_type.startswith("d") or ls_type == "<DIR>":306dirs.append(name)307else:308if self.os_linux:309nondirs.append(310[name, self.parse_date(info[-4], info[-3], info[-2])]311)312else:313file_date = info[0].split("-")314file_time = info[1]315316if "AM" in file_time or "PM" in file_time:317parsed_time = datetime.strptime(file_time, "%I:%M%p")318file_time = parsed_time.strftime(319"%H:%M"320) # from AM/PM to 24-hour format321322nondirs.append(323[324name,325self.parse_date(326file_date[0], file_date[1], file_time, linux=False327),328]329)330331332class SFTPProcessor(FileTransferProcessor):333def __init__(self, **kwargs):334super().__init__(**kwargs)335self.sftp = None336self.os_linux = True337338def connect(self):339try:340ssh = paramiko.SSHClient()341ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())342343if self.ftp_password and not self.pkey:344print(self.hostname, self.port, self.ftp_user, self.ftp_password)345ssh.connect(346self.hostname,347port=self.port,348username=self.ftp_user,349password=self.ftp_password,350look_for_keys=False,351allow_agent=False,352)353354else:355try:356key = paramiko.RSAKey.from_private_key_file(self.pkey)357ssh.connect(self.hostname, self.port, self.ftp_user, pkey=key)358except paramiko.AuthenticationException as e:359logging.error(f"Authentication failed: {e}")360raise361except Exception as e:362logging.error(f"An unexpected error occurred: {e}")363raise364365self.sftp = ssh.open_sftp()366logging.info(f"Connected to SFTP server at {self.hostname}")367368except paramiko.AuthenticationException:369logging.error("Authentication failed. Please check your credentials.")370except paramiko.SSHException as e:371logging.error(f"SSH connection error: {e}")372except Exception as e:373logging.error(f"An unexpected error occurred: {e}")374375return self.sftp376377def delete_file(self, file):378try:379self.sftp.remove(file)380return f"File {file} deleted."381except FileNotFoundError as e:382return f"File not found: {e}"383except OSError as e:384return f"An IOError occurred: {e}"385except Exception as e:386return f"An unexpected error occurred: {e}"387388def get_working_directory(self):389return self.sftp.getcwd()390391def set_working_directory(self, path):392try:393self.sftp.chdir(path)394except Exception as e:395print(f"Error changing working directory: {e}")396397def set_ftp_binary_file(self, file, image):398"""Copy a remote file (remotepath) from the SFTP server and write to an open file or file-like object399400Args:401file (String): remote file path402image (TemporaryFile): image file destination403"""404wd = self.get_working_directory()405self.sftp.getfo(wd + "/" + file, image)406407def list_files(self):408file_list = []409410try:411file_list_attr = self.sftp.listdir_attr()412413for attr in file_list_attr:414file_list.append(str(attr).split())415416for info in file_list:417# info format: ['-rw-------', '1', '0', '0', '175289', '16', 'Nov', '18:10', 'demo.jpg']418# adapting to linux standard: ['-rw-------', '1', '0', '0', '175289', 'Nov', '16', '18:10', 'demo.jpg']419tmp_month = info[-3]420info[-3] = info[-4]421info[-4] = tmp_month422423except Exception as e:424print(f"Error listing files: {e}")425return426427return file_list428429@get_files_and_dirs430def retrieve_files(self, file_list, dirs, nondirs):431for info in file_list:432name = info[-1]433if info[0].startswith("d"):434dirs.append(name)435else:436nondirs.append([name, self.parse_date(info[-4], info[-3], info[-2])])437438439def parse_arguments(args_hook=lambda _: _):440parser = argparse.ArgumentParser(441description="Read license plates from the images on an FTP server and output the result as JSON or CSV",442epilog="""443Examples:\n444445FTP:446---447Process images on an FTP server:448ftp_and_sftp_processor.py -a MY_API_KEY -H host -U user1 -P pass449Specify Camera ID and/or two Regions:450ftp_and_sftp_processor.py -a MY_API_KEY -H host -U user1 -P pass -f /home/user1 --camera-id Camera1 -r us-ca -r th-37451Use the Snapshot SDK instead of the Cloud Api:452ftp_and_sftp_processor.py -H host -U user1 -P pass -s http://localhost:8080453454""",455formatter_class=argparse.RawTextHelpFormatter,456)457parser.add_argument("-a", "--api-key", help="Your API key.", required=False)458parser.add_argument(459"-r",460"--regions",461help="Match the license plate pattern fo specific region",462required=False,463action="append",464)465parser.add_argument(466"-s",467"--sdk-url",468help="Url to self hosted sdk For example, http://localhost:8080",469required=False,470)471parser.add_argument(472"--camera-id", help="Name of the source camera.", required=False473)474475parser.add_argument(476"-c",477"--protocol",478help="protocol tu use, available choices 'ftp' or 'sftp'",479choices="ftp sftp".split(),480default="ftp",481required=False,482)483484args_hook(parser)485args = parser.parse_args()486487if not args.api_key:488raise Exception("api-key parameter is required")489490return args491492493def custom_args(parser):494parser.epilog += """495496Specify a folder on the FTP server:497ftp_and_sftp_processor.py -a MY_API_KEY -H 192.168.0.59 -U user1 -P pass -f /home/user1498Delete processed files from the FTP server after 10 seconds:499ftp_and_sftp_processor.py -a MY_API_KEY -H 192.168.0.59 -U user1 -P pass -f /home/user1 -d 10500Specify a folder containing dynamic cameras, Sub-folder names are Camera IDs:501ftp_and_sftp_processor.py -a MY_API_KEY -H 192.168.0.59 -U user1 -P pass --cameras-root /srv/cameras502Periodically check for new files every 10 seconds:503ftp_and_sftp_processor.py -a MY_API_KEY -H 192.168.0.59 -U user1 -P pass -f /home/user1 -i 10504Enable Make Model and Color prediction:505ftp_and_sftp_processor.py -a MY_API_KEY -H 192.168.0.59 -U user1 -P pass -f /home/user1 --mmc506Specify an output file and format for the results:507ftp_and_sftp_processor.py -a MY_API_KEY -H 192.168.0.59 -U user1 -P pass -f /home/user1 -o data.csv --format csv508509SFTP:510----511ftp_password login, Process images in /tmp/images:512ftp_and_sftp_processor.py -c sftp -U usr1 -P pass -H 192.168.0.59 -f /tmp/images -a 4805bee#########\n513Private Key login, Process images in /tmp/images:514ftp_and_sftp_processor.py -c sftp -U usr1 --pkey '/home/danleyb2/.ssh/id_rsa' -H 192.168.0.59 -f /tmp/images -a 4805bee#########\n515ftp_password login, Process images in /tmp/images using Snapshot SDK:516ftp_and_sftp_processor.py -c sftp -U usr1 -P pass -H 192.168.0.59 -f /tmp/images -s http://localhost:8080\n517Process images in /tmp/images Periodically every 5 seconds:518ftp_and_sftp_processor.py -c sftp -U usr1 -P pass -H 192.168.0.59 -f /tmp/images -a 4805bee######### -i 5519"""520parser.add_argument("-t", "--timestamp", help="Timestamp.", required=False)521parser.add_argument("-H", "--hostname", help="host", required=True)522parser.add_argument("-p", "--port", help="port", required=False)523parser.add_argument(524"-U", "--ftp-user", help="Transfer protocol server user", required=True525)526parser.add_argument(527"-P",528"--ftp-password",529help="Transfer protocol server user's password",530required=False,531)532parser.add_argument("--pkey", help="SFTP Private Key Path", required=False)533parser.add_argument(534"-d",535"--delete",536type=int,537help="Remove images from the FTP server after processing. Optionally specify a timeout in seconds.",538nargs="?",539const=0,540)541parser.add_argument(542"-f",543"--folder",544help="Specify folder with images on the FTP server.",545default="/",546)547parser.add_argument(548"--cameras-root", help="Root folder containing dynamic cameras", required=False549)550parser.add_argument("-o", "--output-file", help="Save result to file.")551parser.add_argument(552"--format",553help="Format of the result.",554default="json",555choices="json csv".split(),556)557parser.add_argument(558"--mmc",559action="store_true",560help="Predict vehicle make and model (SDK only). It has to be enabled.",561)562parser.add_argument(563"-i",564"--interval",565type=int,566help="Periodically fetch new images from the server every interval seconds.",567)568569def default_port():570return 21 if parser.parse_args().protocol == "ftp" else 22571572parser.set_defaults(port=default_port())573574575def ftp_process(args):576args_dict = vars(args)577578if args.protocol == "ftp":579file_processor = FTPProcessor(**args_dict)580else:581if not args.ftp_password and not args.pkey:582raise Exception("ftp_password or pkey path are required")583file_processor = SFTPProcessor(**args_dict)584585"""586for attr, value in file_processor.__dict__.items():587print(f"{attr}: {value}")588"""589590file_processor.connect()591592if args.cameras_root:593file_processor.set_working_directory(args.cameras_root)594file_list, dirs, nondirs = file_processor.retrieve_files()595for folder in dirs:596logging.info(597f"Processing Dynamic Camera : {file_processor.get_working_directory()}"598)599args.folder = os.path.join(args.cameras_root, folder)600# The camera id is the folder name601args.camera_id = folder602603file_processor.processing_single_camera(args)604else:605file_processor.processing_single_camera(args)606607608def main():609args = parse_arguments(custom_args)610611if args.interval and args.interval > 0:612while True:613try:614ftp_process(args)615except Exception as e:616print(f"ERROR: {e}")617time.sleep(args.interval)618else:619ftp_process(args)620621622if __name__ == "__main__":623main()624625626