Skip to content

sftp_utils

SFTP utils are used to interact with SFTP servers.

SFTPClient

A SFTP client with built-in retry, secret handling, and connection reuse that can be used with the KEYS_FILE or custom connection details.

This class allows interaction with SFTP servers providing a high-level interface for file and directory operations, such as uploading, downloading, reading CSV/Excel files, and managing directories. The client supports automatic retries (up to 3 times with a 2-second wait) to improve reliability of transient SFTP failures.

Custom operations are possible by using the connect() and disconnect() methods. The connect() method return a pysftp.Connection object, which can be used for custom SFTP operations. The disconnect() method should be called to close the connection when done. This class also supports context management using the with statement, which automatically handles connection opening and closing.

Credentials can be provided either via the KEYS_FILE or directly through a dictionary (sftp_config). Secrets are securely fetched via the Azure Key Vault. Credentials are validated upon initialization.

Parameters:

Name Type Description Default
keys_file_name str

The name of the key in the YAML config used to load SFTP credentials.

None
sftp_config dict

A dictionary with keys "host", "port", "username", and "secret".

None

Raises:

Type Description
ValueError

If neither keys_file_name nor sftp_config is provided.

KeyError

If required keys are missing from the configuration.

ValueError

If the port value is not a valid integer.

Exception

If secret retrieval or SFTP connection initialization fails.

Example
from physical_operations_utils.sftp_utils import SFTPClient

# Using YAML config with context manager
with SFTPClient(keys_file_name="my_sftp") as client:
    client.upload_file("local_file.csv", "/remote/dir/data.csv")

# Or using manual connection control
client = SFTPClient(sftp_config={
    "host": "sftp.example.com",
    "port": 22,
    "username": "user",
    "secret": "my-sftp-password-secret"
})

client.connect()
df = client.read_csv_file("/remote/data.csv")
client.download_file("/remote/data.csv", "local.csv")
client.close_connection()
Source code in physical_operations_utils/sftp_utils.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
class SFTPClient:
    """
    A SFTP client with built-in retry, secret handling, and connection reuse that can be used with the KEYS_FILE
    or custom connection details.

    This class allows interaction with SFTP servers providing a high-level interface for file and
    directory operations, such as uploading, downloading, reading CSV/Excel files, and managing
    directories. The client supports automatic retries (up to 3 times with a 2-second wait)
    to improve reliability of transient SFTP failures.

    Custom operations are possible by using the connect() and disconnect() methods. The connect() method
    return a pysftp.Connection object, which can be used for custom SFTP operations. The disconnect() method
    should be called to close the connection when done. This class also supports context management using the
    `with` statement, which automatically handles connection opening and closing.

    Credentials can be provided either via the KEYS_FILE or directly through a dictionary (`sftp_config`).
    Secrets are securely fetched via the Azure Key Vault. Credentials are validated upon initialization.

    Args:
        keys_file_name (str, optional): The name of the key in the YAML config used to load SFTP credentials.
        sftp_config (dict, optional): A dictionary with keys `"host"`, `"port"`, `"username"`, and `"secret"`.

    Raises:
        ValueError: If neither `keys_file_name` nor `sftp_config` is provided.
        KeyError: If required keys are missing from the configuration.
        ValueError: If the port value is not a valid integer.
        Exception: If secret retrieval or SFTP connection initialization fails.

    Example:
        ```python
        from physical_operations_utils.sftp_utils import SFTPClient

        # Using YAML config with context manager
        with SFTPClient(keys_file_name="my_sftp") as client:
            client.upload_file("local_file.csv", "/remote/dir/data.csv")

        # Or using manual connection control
        client = SFTPClient(sftp_config={
            "host": "sftp.example.com",
            "port": 22,
            "username": "user",
            "secret": "my-sftp-password-secret"
        })

        client.connect()
        df = client.read_csv_file("/remote/data.csv")
        client.download_file("/remote/data.csv", "local.csv")
        client.close_connection()
        ```
    """

    def __init__(self, keys_file_name: str = None, sftp_config: dict = None):
        if not keys_file_name and not sftp_config:
            raise ValueError("Either keys_file_name or sftp_config must be provided.")
        setup_environment()
        if keys_file_name:
            use_sftp_config = get_keys_yaml_file()[keys_file_name]
        else:
            use_sftp_config = sftp_config.copy()
        self.sftp_config = self._validate_sftp_config(use_sftp_config)
        try:
            self.sftp_config["password"] = get_secret(self.sftp_config["secret"])
        except Exception as e:
            raise Exception(f"Failed to retrieve secret for SFTP configuration: {e}")
        cnopts = pysftp.CnOpts()
        cnopts.hostkeys = None
        self.sftp_config["cnopts"] = cnopts
        self._connection = None

    @retry(
        wait=wait_fixed(2),
        stop=stop_after_attempt(3),
        retry=retry_if_exception_type((ConnectionError, TimeoutError)),
    )
    def close_connection(self):
        """
        Closes the active SFTP connection and resets the connection state.

        This method should be called after using a manually-managed connection to
        ensure the session is cleanly closed. It is also automatically used when
        exiting a `with SFTPClient(...)` context manager block.

        If no connection is active, this method does nothing.

        Retries up to 3 times with a 2-second wait between attempts if the close fails.

        Example:
            ```python
            client = SFTPClient(keys_file_name="my_sftp")
            client.connect()
            # perform custom SFTP operations...
            client.close_connection()
            ```
        """
        if self._connection:
            self._connection.close()
            self._connection = None

    @retry(
        wait=wait_fixed(2),
        stop=stop_after_attempt(3),
        retry=retry_if_exception_type((ConnectionError, TimeoutError)),
    )
    def connect(self) -> pysftp.Connection:
        """
        Establishes and returns a reusable connection to the SFTP server.

        Uses the credentials provided at initialization to establish a connection
        to the remote SFTP server. The connection is cached and reused for all future
        operations within the same `SFTPClient` instance.

        This method is automatically called when using any of the public methods
        (e.g., `upload_file`, `download_file`, etc.), but can also be used manually
        when custom SFTP operations are needed.

        Retries up to 3 times with a 2-second wait between attempts if the connection fails.

        Returns:
            pysftp.Connection: A live SFTP connection object that can be used for custom operations.

        Raises:
            Exception: If the connection attempt fails after all retry attempts.

        Example:
            ```python
            from physical_operations_utils.sftp_utils import SFTPClient

            client = SFTPClient(keys_file_name="my_sftp")
            sftp_conn = client.connect()
            files = sftp_conn.listdir("/remote/path")
            client.close_connection()
            ```
        """
        try:
            return pysftp.Connection(
                host=self.sftp_config["host"],
                username=self.sftp_config["username"],
                password=self.sftp_config["password"],
                port=self.sftp_config["port"],
                cnopts=self.sftp_config["cnopts"],
            )
        except Exception as e:
            logging.error(f"Failed to connect to SFTP server: {e}")
            raise

    @retry(
        wait=wait_fixed(2),
        stop=stop_after_attempt(3),
        retry=retry_if_exception_type((ConnectionError, TimeoutError)),
    )
    def create_directory(
        self,
        remote_path: str,
    ) -> None:
        """
        Creates a new directory on the remote SFTP server.

        Args:
            remote_path (str): The full remote path to the directory to be created.

        Raises:
            FileExistsError: If the directory already exists on the remote server.

        Example:
            ```python
            from physical_operations_utils.sftp_utils import SFTPClient

            client = SFTPClient(keys_file_name="my_sftp")
            client.connect()
            # Create a new directory on the remote server
            client.create_directory("/remote/new_folder")
            client.close_connection()
            ```
        """
        sftp_conn = self.connect()
        with sftp_conn:
            if sftp_conn.exists(remote_path):
                raise FileExistsError(f"Directory {remote_path} already exists.")
            sftp_conn.makedirs(remote_path)

    @retry(
        wait=wait_fixed(2),
        stop=stop_after_attempt(3),
        retry=retry_if_exception_type((ConnectionError, TimeoutError)),
    )
    def delete_file(self, remote_path: str) -> None:
        """
        Deletes a file from the remote SFTP server.

        Args:
            remote_path (str): The full remote path of the file to delete.

        Raises:
            FileNotFoundError: If the file does not exist.

        Example:
            ```python
            from physical_operations_utils.sftp_utils import SFTPClient
            client = SFTPClient(keys_file_name="my_sftp")
            client.connect()
            # Delete a file from the remote server
            client.delete_file("/remote/file.csv")
            client.close_connection()
            ```
        """
        sftp_conn = self.connect()
        with sftp_conn:
            if not sftp_conn.exists(remote_path):
                raise FileNotFoundError(f"File {remote_path} does not exist.")
            sftp_conn.remove(remote_path)

    @retry(
        wait=wait_fixed(2),
        stop=stop_after_attempt(3),
        retry=retry_if_exception_type((ConnectionError, TimeoutError)),
    )
    def download_file(
        self,
        remote_path: str,
        local_path: str,
        overwrite_existing: bool = False,
    ) -> None:
        """
        Downloads a file from the SFTP server to the local filesystem.

        Args:
            remote_path (str): The full path to the file on the remote SFTP server.
            local_path (str): The full path on the local machine where the file will be saved.
            overwrite_existing (bool): Whether to overwrite the file if it already exists locally.

        Raises:
            FileNotFoundError: If the remote file does not exist.
            FileExistsError: If the local file exists and overwrite is disabled.

        Example:
            ```python
            from physical_operations_utils.sftp_utils import SFTPClient

            client = SFTPClient(keys_file_name="my_sftp")
            client.connect()
            # Download a file from the remote server
            client.download_file("/remote/file.csv", "./file.csv")
            client.close_connection()
            ```
        """
        sftp_conn = self.connect()
        with sftp_conn:
            if not sftp_conn.exists(remote_path):
                raise FileNotFoundError(f"File {remote_path} does not exist.")
            if os.path.exists(local_path) and not overwrite_existing:
                raise FileExistsError(
                    f"Local file '{local_path}' already exists and overwrite is disabled."
                )
            sftp_conn.get(remote_path, local_path)

    @retry(
        wait=wait_fixed(2),
        stop=stop_after_attempt(3),
        retry=retry_if_exception_type((ConnectionError, TimeoutError)),
    )
    def list_directories(self, remote_path: str) -> List[str]:
        """
        Lists all subdirectories in a remote directory.

        Args:
            remote_path (str): Path to the remote directory to inspect.

        Returns:
            List[str]: A list of subdirectory names.

        Raises:
            FileNotFoundError: If the path does not exist on the remote server.

        Example:
            ```python
            from physical_operations_utils.sftp_utils import SFTPClient

            client = SFTPClient(keys_file_name="my_sftp")
            client.connect()
            # List all subdirectories in a remote directory
            folders = client.list_directories("/remote/data")
            print(folders)
            client.close_connection()
            ```
        """
        sftp_conn = self.connect()
        with sftp_conn:
            if not sftp_conn.exists(remote_path):
                raise FileNotFoundError(f"Path {remote_path} does not exist.")
            return [
                item.filename
                for item in sftp_conn.listdir_attr(remote_path)
                if stat.S_ISDIR(item.st_mode)
            ]

    @retry(
        wait=wait_fixed(2),
        stop=stop_after_attempt(3),
        retry=retry_if_exception_type((ConnectionError, TimeoutError)),
    )
    def list_file_details(self, remote_path: str) -> List[dict]:
        """
        Returns a list of metadata dictionaries for each file/directory in a remote directory.

        Args:
            remote_path (str): The remote path to inspect.

        Returns:
            List[dict]: A list of dictionaries containing file metadata.

        Raises:
            FileNotFoundError: If the path does not exist on the remote server.

        Example:
            ```python
            from physical_operations_utils.sftp_utils import SFTPClient

            client = SFTPClient(keys_file_name="my_sftp")
            client.connect()
            # List all files in a remote directory
            details = client.list_file_details("/remote")
            for file in details:
                print(file)
            client.close_connection()
            ```
        """
        sftp_conn = self.connect()
        with sftp_conn:
            if not sftp_conn.exists(remote_path):
                raise FileNotFoundError(f"Path {remote_path} does not exist.")
            return [
                {
                    "filename": item.filename,
                    "size": item.st_size,
                    "permissions": stat.filemode(item.st_mode),
                    "last_modified": item.st_mtime,
                    "last_accessed": item.st_atime,
                    "is_directory": stat.S_ISDIR(item.st_mode),
                }
                for item in sftp_conn.listdir_attr(remote_path)
            ]

    @retry(
        wait=wait_fixed(2),
        stop=stop_after_attempt(3),
        retry=retry_if_exception_type((ConnectionError, TimeoutError)),
    )
    def list_files(self, path_on_sftp: str) -> List[str]:
        """
        Lists the names of all files and directories in a given remote path.

        Args:
            path_on_sftp (str): Remote directory path to list contents from.

        Returns:
            List[str]: A list of filenames and folder names in the specified path.

        Example:
            ```python
            from physical_operations_utils.sftp_utils import SFTPClient

            client = SFTPClient(keys_file_name="my_sftp")
            client.connect()
            # List all files and directories in a remote path
            files = client.list_files("/remote")
            print(files)
            client.close_connection()
            ```
        """
        sftp_conn = self.connect()
        with sftp_conn:
            files = sftp_conn.listdir(path_on_sftp)
            return files

    @retry(
        wait=wait_fixed(2),
        stop=stop_after_attempt(3),
        retry=retry_if_exception_type((ConnectionError, TimeoutError)),
    )
    def move_file(
        self,
        remote_path: str,
        new_path: str,
    ) -> None:
        """
        Moves a file on the SFTP server.

        Args:
            remote_path (str): The current path of the file.
            new_path (str): The new path or name for the file.

        Raises:
            FileNotFoundError: If the source file does not exist.
            FileExistsError: If a file already exists at the destination path.

        Example:
            ```python
            from physical_operations_utils.sftp_utils import SFTPClient

            client = SFTPClient(keys_file_name="my_sftp")
            client.connect()
            # Move a file on the remote server
            client.move_file("/remote/old.csv", "/remote/archive/old.csv")
            client.close_connection()
            ```
        """
        sftp_conn = self.connect()
        with sftp_conn:
            if not sftp_conn.exists(remote_path):
                raise FileNotFoundError(f"File {remote_path} does not exist.")
            if sftp_conn.exists(new_path):
                raise FileExistsError(f"File {new_path} already exists on SFTP server.")
            sftp_conn.rename(remote_path, new_path)

    @retry(
        wait=wait_fixed(2),
        stop=stop_after_attempt(3),
        retry=retry_if_exception_type((ConnectionError, TimeoutError)),
    )
    def read_csv_file(self, remote_path: str, **kwargs: Any) -> pd.DataFrame:
        """
        Reads a CSV file from the SFTP server into a pandas DataFrame.

        Keyword arguments are passed directly through to `pandas.read_csv`. See pandas documentation for details.

        Args:
            remote_path (str): Full path to the remote CSV file.
            **kwargs: Additional keyword arguments passed to `pandas.read_csv`.

        Returns:
            pd.DataFrame: A DataFrame containing the contents of the CSV file.

        Raises:
            FileNotFoundError: If the file does not exist on the SFTP server.
            ValueError: If the file extension is not '.csv'.

        Example:
            ```python
            from physical_operations_utils.sftp_utils import SFTPClient

            client = SFTPClient(keys_file_name="my_sftp")
            client.connect()
            # Read a CSV file from the remote server
            df = client.read_csv_file("/remote/data.csv", sep=",")
            print(df.head())
            client.close_connection()
            ```
        """
        _, ext = os.path.splitext(remote_path.lower())
        if ext != ".csv":
            raise ValueError(f"File {remote_path} does not have a valid CSV extension.")

        sftp_conn = self.connect()
        with sftp_conn:
            if not sftp_conn.exists(remote_path):
                raise FileNotFoundError(f"File {remote_path} does not exist.")
            with sftp_conn.open(remote_path, "rb") as remote_file:
                file_data = remote_file.read()
                df = pd.read_csv(BytesIO(file_data), **kwargs)
                return df

    @retry(
        wait=wait_fixed(2),
        stop=stop_after_attempt(3),
        retry=retry_if_exception_type((ConnectionError, TimeoutError)),
    )
    def read_excel_file(self, remote_path: str, **kwargs: Any) -> pd.DataFrame:
        """
        Reads an Excel file (.xls or .xlsx) from the SFTP server into a pandas DataFrame.

        Keyword arguments are passed directly through to `pandas.read_excel`. See pandas documentation for details.

        Args:
            remote_path (str): Full path to the remote Excel file.
            **kwargs: Additional keyword arguments passed to `pandas.read_excel`.

        Returns:
            pd.DataFrame: A DataFrame containing the contents of the Excel file.

        Raises:
            FileNotFoundError: If the file does not exist.
            ValueError: If the file is not an Excel file.

        Example:
            ```python
            from physical_operations_utils.sftp_utils import SFTPClient

            client = SFTPClient(keys_file_name="my_sftp")
            client.connect()
            # Read an Excel file from the remote server
            df = client.read_excel_file("/remote/data.xlsx", sheet_name="Sheet1")
            print(df.head())
            client.close_connection()
            ```
        """
        _, ext = os.path.splitext(remote_path.lower())
        if ext not in [".xls", ".xlsx"]:
            raise ValueError(
                f"File {remote_path} does not have a valid Excel extension."
            )

        sftp_conn = self.connect()
        with sftp_conn:
            if not sftp_conn.exists(remote_path):
                raise FileNotFoundError(f"File {remote_path} does not exist.")
            with sftp_conn.open(remote_path, "rb") as remote_file:
                file_data = remote_file.read()
                df = pd.read_excel(BytesIO(file_data), **kwargs)
                return df

    @retry(
        wait=wait_fixed(2),
        stop=stop_after_attempt(3),
        retry=retry_if_exception_type((ConnectionError, TimeoutError)),
    )
    def remove_directory(
        self,
        remote_path: str,
    ) -> None:
        """
        Removes an empty directory from the SFTP server.

        Args:
            remote_path (str): Full remote path of the directory to remove.

        Raises:
            FileNotFoundError: If the directory does not exist.
            OSError: If the directory is not empty.

        Example:
            ```python
            from physical_operations_utils.sftp_utils import SFTPClient

            client = SFTPClient(keys_file_name="my_sftp")
            client.connect()
            # Remove an empty directory from the remote server
            client.remove_directory("/remote/old_folder")
            client.close_connection()
            ```
        """
        sftp_conn = self.connect()
        with sftp_conn:
            if not sftp_conn.exists(remote_path):
                raise FileNotFoundError(f"Directory {remote_path} does not exist.")
            if len(sftp_conn.listdir(remote_path)) > 0:
                raise OSError(f"Directory {remote_path} is not empty.")
            sftp_conn.rmdir(remote_path)

    @retry(
        wait=wait_fixed(2),
        stop=stop_after_attempt(3),
        retry=retry_if_exception_type((ConnectionError, TimeoutError)),
    )
    def rename_file(
        self,
        remote_path: str,
        new_name: str,
    ) -> None:
        """
        Renames a file on the SFTP server.

        Args:
            remote_path (str): Full remote path of the file to rename.
            new_name (str): New name for the file (not a full path).

        Raises:
            FileNotFoundError: If the file does not exist.
            FileExistsError: If a file with the new name already exists.

        Example:
            ```python
            from physical_operations_utils.sftp_utils import SFTPClient

            client = SFTPClient(keys_file_name="my_sftp")
            client.connect()
            # Rename a file on the remote server
            client.rename_file("/remote/report.csv", "report_2024.csv")
            client.close_connection()
            ```
        """
        sftp_conn = self.connect()
        with sftp_conn:
            if not sftp_conn.exists(remote_path):
                raise FileNotFoundError(f"File {remote_path} does not exist.")
            directory = os.path.dirname(remote_path)
            new_path = os.path.join(directory, new_name)
            if sftp_conn.exists(new_path):
                raise FileExistsError(f"File {new_path} already exists on SFTP server.")
            sftp_conn.rename(remote_path, new_path)

    @retry(
        wait=wait_fixed(2),
        stop=stop_after_attempt(3),
        retry=retry_if_exception_type((ConnectionError, TimeoutError)),
    )
    def upload_file(
        self,
        local_path: str,
        remote_path: str,
        overwrite_existing: bool = False,
    ) -> None:
        """
        Uploads a local file to the remote SFTP server.

        Args:
            local_path (str): Path to the file on the local filesystem.
            remote_path (str): Full destination path on the SFTP server.
            overwrite_existing (bool): Whether to overwrite if the file already exists remotely.

        Raises:
            FileNotFoundError: If the local file or remote directory does not exist.
            FileExistsError: If the file exists and overwrite is not allowed.

        Example:
            ```python
            from physical_operations_utils.sftp_utils import SFTPClient

            client = SFTPClient(keys_file_name="my_sftp")
            client.connect()
            # Upload a file to the remote server
            client.upload_file("./file.csv", "/remote/inbox/file.csv")
            client.close_connection()
            ```
        """
        sftp_conn = self.connect()
        with sftp_conn:
            if not os.path.exists(local_path):
                raise FileNotFoundError(f"Local file '{local_path}' does not exist.")
            if not sftp_conn.exists(os.path.dirname(remote_path)):
                raise FileNotFoundError(
                    f"Target directory '{os.path.dirname(remote_path)}' does not exist on SFTP."
                )

            if sftp_conn.exists(remote_path) and not overwrite_existing:
                raise FileExistsError(
                    f"File '{remote_path}' already exists and overwrite is disabled."
                )

            sftp_conn.put(local_path, remote_path)

    @retry(
        wait=wait_fixed(2),
        stop=stop_after_attempt(3),
        retry=retry_if_exception_type((ConnectionError, TimeoutError)),
    )
    def write_csv_file(
        self,
        df: pd.DataFrame,
        remote_path: str,
        overwrite_existing: bool = False,
        **kwargs: Any,
    ) -> None:
        """
        Writes a pandas DataFrame to a CSV file on the SFTP server.

        Keyword arguments are passed directly through to `pandas.to_csv`. See pandas documentation for details.

        Args:
            df (pd.DataFrame): The DataFrame to write.
            remote_path (str): Full path to the target CSV file on the remote server.
            overwrite_existing (bool): Whether to overwrite an existing file.
            **kwargs: Additional arguments passed to `df.to_csv()`.

        Raises:
            FileExistsError: If the file already exists and overwrite is not allowed.
            FileNotFoundError: If the parent directory on the SFTP server does not exist.
            ValueError: If the target path does not end with '.csv'.

        Example:
            ```python
            from physical_operations_utils.sftp_utils import SFTPClient

            client = SFTPClient(keys_file_name="my_sftp")
            client.connect()
            # Create a DataFrame
            df = pd.DataFrame({"A": [1, 2], "B": [3, 4]})
            # Write the DataFrame to a CSV file on the remote server
            client.write_csv_file(df, "/remote/outbox/results.csv", index=False)
            client.close_connection()
            ```
        """
        _, ext = os.path.splitext(remote_path.lower())
        if ext != ".csv":
            raise ValueError(f"Target path '{remote_path}' must end in '.csv'")

        remote_dir = os.path.dirname(remote_path)

        sftp_conn = self.connect()
        with sftp_conn:
            if not sftp_conn.exists(remote_dir):
                raise FileNotFoundError(
                    f"Target directory '{remote_dir}' does not exist on SFTP."
                )

            if sftp_conn.exists(remote_path) and not overwrite_existing:
                raise FileExistsError(
                    f"File '{remote_path}' already exists and overwrite is disabled."
                )

            csv_buffer = StringIO()
            df.to_csv(csv_buffer, **kwargs)
            csv_buffer.seek(0)

            with sftp_conn.open(remote_path, "w") as remote_file:
                remote_file.write(csv_buffer.read())

    def _validate_sftp_config(self, sftp_config: dict) -> dict:
        """
        Validates the SFTP configuration dictionary. This method is private and
        should not be called directly. It is called during initialization to ensure
        that the provided configuration is valid.

        Args:
            sftp_config (dict): The SFTP configuration dictionary.

        Returns:
            dict: The validated SFTP configuration dictionary.

        Raises:
            KeyError: If required keys are missing.
            ValueError: If the port is not an integer.

        Example:
            ```python
            from physical_operations_utils.sftp_utils import SFTPClient

            sftp_config = {
                "host": "sftp.example.com",
                "port": 22,
                "username": "user",
                "secret": "my-sftp-password-secret"
            }
            client = SFTPClient(sftp_config=sftp_config)
            ```
        """
        required_keys = ["host", "port", "username", "secret"]
        for key in required_keys:
            if key not in sftp_config:
                raise KeyError(f"Missing required SFTP configuration key: {key}")
        try:
            sftp_config["port"] = int(sftp_config["port"])
        except ValueError:
            raise ValueError("Port must be an integer.")
        return sftp_config

    def __enter__(self) -> "SFTPClient":
        """
        Allows the use of the SFTPClient instance as a context manager.
        Automatically connects to the SFTP server when entering the context.

        Returns:
            SFTPClient: The SFTPClient instance itself.

        Example:
            ```python
            from physical_operations_utils.sftp_utils import SFTPClient

            with SFTPClient(keys_file_name="my_sftp") as client:
                # Perform SFTP operations
                client.upload_file("local_file.csv", "/remote/dir/data.csv")
            ```
        """
        self.connect()
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Optional[TracebackType],
    ) -> None:
        """
        Automatically closes the SFTP connection when exiting the context.
        This method is called when leaving the context manager block.

        Args:
            exc_type: The exception type, if any.
            exc_val: The exception value, if any.
            exc_tb: The traceback object, if any.

        Example:
            ```python
            from physical_operations_utils.sftp_utils import SFTPClient

            with SFTPClient(keys_file_name="my_sftp") as client:
                # Perform SFTP operations
                client.upload_file("local_file.csv", "/remote/dir/data.csv")
            ```
        """
        self.close_connection()

__enter__()

Allows the use of the SFTPClient instance as a context manager. Automatically connects to the SFTP server when entering the context.

Returns:

Name Type Description
SFTPClient SFTPClient

The SFTPClient instance itself.

Example
from physical_operations_utils.sftp_utils import SFTPClient

with SFTPClient(keys_file_name="my_sftp") as client:
    # Perform SFTP operations
    client.upload_file("local_file.csv", "/remote/dir/data.csv")
Source code in physical_operations_utils/sftp_utils.py
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
def __enter__(self) -> "SFTPClient":
    """
    Allows the use of the SFTPClient instance as a context manager.
    Automatically connects to the SFTP server when entering the context.

    Returns:
        SFTPClient: The SFTPClient instance itself.

    Example:
        ```python
        from physical_operations_utils.sftp_utils import SFTPClient

        with SFTPClient(keys_file_name="my_sftp") as client:
            # Perform SFTP operations
            client.upload_file("local_file.csv", "/remote/dir/data.csv")
        ```
    """
    self.connect()
    return self

__exit__(exc_type, exc_val, exc_tb)

Automatically closes the SFTP connection when exiting the context. This method is called when leaving the context manager block.

Parameters:

Name Type Description Default
exc_type Optional[Type[BaseException]]

The exception type, if any.

required
exc_val Optional[BaseException]

The exception value, if any.

required
exc_tb Optional[TracebackType]

The traceback object, if any.

required
Example
from physical_operations_utils.sftp_utils import SFTPClient

with SFTPClient(keys_file_name="my_sftp") as client:
    # Perform SFTP operations
    client.upload_file("local_file.csv", "/remote/dir/data.csv")
Source code in physical_operations_utils/sftp_utils.py
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
def __exit__(
    self,
    exc_type: Optional[Type[BaseException]],
    exc_val: Optional[BaseException],
    exc_tb: Optional[TracebackType],
) -> None:
    """
    Automatically closes the SFTP connection when exiting the context.
    This method is called when leaving the context manager block.

    Args:
        exc_type: The exception type, if any.
        exc_val: The exception value, if any.
        exc_tb: The traceback object, if any.

    Example:
        ```python
        from physical_operations_utils.sftp_utils import SFTPClient

        with SFTPClient(keys_file_name="my_sftp") as client:
            # Perform SFTP operations
            client.upload_file("local_file.csv", "/remote/dir/data.csv")
        ```
    """
    self.close_connection()

close_connection()

Closes the active SFTP connection and resets the connection state.

This method should be called after using a manually-managed connection to ensure the session is cleanly closed. It is also automatically used when exiting a with SFTPClient(...) context manager block.

If no connection is active, this method does nothing.

Retries up to 3 times with a 2-second wait between attempts if the close fails.

Example
client = SFTPClient(keys_file_name="my_sftp")
client.connect()
# perform custom SFTP operations...
client.close_connection()
Source code in physical_operations_utils/sftp_utils.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
@retry(
    wait=wait_fixed(2),
    stop=stop_after_attempt(3),
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),
)
def close_connection(self):
    """
    Closes the active SFTP connection and resets the connection state.

    This method should be called after using a manually-managed connection to
    ensure the session is cleanly closed. It is also automatically used when
    exiting a `with SFTPClient(...)` context manager block.

    If no connection is active, this method does nothing.

    Retries up to 3 times with a 2-second wait between attempts if the close fails.

    Example:
        ```python
        client = SFTPClient(keys_file_name="my_sftp")
        client.connect()
        # perform custom SFTP operations...
        client.close_connection()
        ```
    """
    if self._connection:
        self._connection.close()
        self._connection = None

connect()

Establishes and returns a reusable connection to the SFTP server.

Uses the credentials provided at initialization to establish a connection to the remote SFTP server. The connection is cached and reused for all future operations within the same SFTPClient instance.

This method is automatically called when using any of the public methods (e.g., upload_file, download_file, etc.), but can also be used manually when custom SFTP operations are needed.

Retries up to 3 times with a 2-second wait between attempts if the connection fails.

Returns:

Type Description
Connection

pysftp.Connection: A live SFTP connection object that can be used for custom operations.

Raises:

Type Description
Exception

If the connection attempt fails after all retry attempts.

Example
from physical_operations_utils.sftp_utils import SFTPClient

client = SFTPClient(keys_file_name="my_sftp")
sftp_conn = client.connect()
files = sftp_conn.listdir("/remote/path")
client.close_connection()
Source code in physical_operations_utils/sftp_utils.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
@retry(
    wait=wait_fixed(2),
    stop=stop_after_attempt(3),
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),
)
def connect(self) -> pysftp.Connection:
    """
    Establishes and returns a reusable connection to the SFTP server.

    Uses the credentials provided at initialization to establish a connection
    to the remote SFTP server. The connection is cached and reused for all future
    operations within the same `SFTPClient` instance.

    This method is automatically called when using any of the public methods
    (e.g., `upload_file`, `download_file`, etc.), but can also be used manually
    when custom SFTP operations are needed.

    Retries up to 3 times with a 2-second wait between attempts if the connection fails.

    Returns:
        pysftp.Connection: A live SFTP connection object that can be used for custom operations.

    Raises:
        Exception: If the connection attempt fails after all retry attempts.

    Example:
        ```python
        from physical_operations_utils.sftp_utils import SFTPClient

        client = SFTPClient(keys_file_name="my_sftp")
        sftp_conn = client.connect()
        files = sftp_conn.listdir("/remote/path")
        client.close_connection()
        ```
    """
    try:
        return pysftp.Connection(
            host=self.sftp_config["host"],
            username=self.sftp_config["username"],
            password=self.sftp_config["password"],
            port=self.sftp_config["port"],
            cnopts=self.sftp_config["cnopts"],
        )
    except Exception as e:
        logging.error(f"Failed to connect to SFTP server: {e}")
        raise

create_directory(remote_path)

Creates a new directory on the remote SFTP server.

Parameters:

Name Type Description Default
remote_path str

The full remote path to the directory to be created.

required

Raises:

Type Description
FileExistsError

If the directory already exists on the remote server.

Example
from physical_operations_utils.sftp_utils import SFTPClient

client = SFTPClient(keys_file_name="my_sftp")
client.connect()
# Create a new directory on the remote server
client.create_directory("/remote/new_folder")
client.close_connection()
Source code in physical_operations_utils/sftp_utils.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
@retry(
    wait=wait_fixed(2),
    stop=stop_after_attempt(3),
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),
)
def create_directory(
    self,
    remote_path: str,
) -> None:
    """
    Creates a new directory on the remote SFTP server.

    Args:
        remote_path (str): The full remote path to the directory to be created.

    Raises:
        FileExistsError: If the directory already exists on the remote server.

    Example:
        ```python
        from physical_operations_utils.sftp_utils import SFTPClient

        client = SFTPClient(keys_file_name="my_sftp")
        client.connect()
        # Create a new directory on the remote server
        client.create_directory("/remote/new_folder")
        client.close_connection()
        ```
    """
    sftp_conn = self.connect()
    with sftp_conn:
        if sftp_conn.exists(remote_path):
            raise FileExistsError(f"Directory {remote_path} already exists.")
        sftp_conn.makedirs(remote_path)

delete_file(remote_path)

Deletes a file from the remote SFTP server.

Parameters:

Name Type Description Default
remote_path str

The full remote path of the file to delete.

required

Raises:

Type Description
FileNotFoundError

If the file does not exist.

Example
from physical_operations_utils.sftp_utils import SFTPClient
client = SFTPClient(keys_file_name="my_sftp")
client.connect()
# Delete a file from the remote server
client.delete_file("/remote/file.csv")
client.close_connection()
Source code in physical_operations_utils/sftp_utils.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
@retry(
    wait=wait_fixed(2),
    stop=stop_after_attempt(3),
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),
)
def delete_file(self, remote_path: str) -> None:
    """
    Deletes a file from the remote SFTP server.

    Args:
        remote_path (str): The full remote path of the file to delete.

    Raises:
        FileNotFoundError: If the file does not exist.

    Example:
        ```python
        from physical_operations_utils.sftp_utils import SFTPClient
        client = SFTPClient(keys_file_name="my_sftp")
        client.connect()
        # Delete a file from the remote server
        client.delete_file("/remote/file.csv")
        client.close_connection()
        ```
    """
    sftp_conn = self.connect()
    with sftp_conn:
        if not sftp_conn.exists(remote_path):
            raise FileNotFoundError(f"File {remote_path} does not exist.")
        sftp_conn.remove(remote_path)

download_file(remote_path, local_path, overwrite_existing=False)

Downloads a file from the SFTP server to the local filesystem.

Parameters:

Name Type Description Default
remote_path str

The full path to the file on the remote SFTP server.

required
local_path str

The full path on the local machine where the file will be saved.

required
overwrite_existing bool

Whether to overwrite the file if it already exists locally.

False

Raises:

Type Description
FileNotFoundError

If the remote file does not exist.

FileExistsError

If the local file exists and overwrite is disabled.

Example
from physical_operations_utils.sftp_utils import SFTPClient

client = SFTPClient(keys_file_name="my_sftp")
client.connect()
# Download a file from the remote server
client.download_file("/remote/file.csv", "./file.csv")
client.close_connection()
Source code in physical_operations_utils/sftp_utils.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
@retry(
    wait=wait_fixed(2),
    stop=stop_after_attempt(3),
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),
)
def download_file(
    self,
    remote_path: str,
    local_path: str,
    overwrite_existing: bool = False,
) -> None:
    """
    Downloads a file from the SFTP server to the local filesystem.

    Args:
        remote_path (str): The full path to the file on the remote SFTP server.
        local_path (str): The full path on the local machine where the file will be saved.
        overwrite_existing (bool): Whether to overwrite the file if it already exists locally.

    Raises:
        FileNotFoundError: If the remote file does not exist.
        FileExistsError: If the local file exists and overwrite is disabled.

    Example:
        ```python
        from physical_operations_utils.sftp_utils import SFTPClient

        client = SFTPClient(keys_file_name="my_sftp")
        client.connect()
        # Download a file from the remote server
        client.download_file("/remote/file.csv", "./file.csv")
        client.close_connection()
        ```
    """
    sftp_conn = self.connect()
    with sftp_conn:
        if not sftp_conn.exists(remote_path):
            raise FileNotFoundError(f"File {remote_path} does not exist.")
        if os.path.exists(local_path) and not overwrite_existing:
            raise FileExistsError(
                f"Local file '{local_path}' already exists and overwrite is disabled."
            )
        sftp_conn.get(remote_path, local_path)

list_directories(remote_path)

Lists all subdirectories in a remote directory.

Parameters:

Name Type Description Default
remote_path str

Path to the remote directory to inspect.

required

Returns:

Type Description
List[str]

List[str]: A list of subdirectory names.

Raises:

Type Description
FileNotFoundError

If the path does not exist on the remote server.

Example
from physical_operations_utils.sftp_utils import SFTPClient

client = SFTPClient(keys_file_name="my_sftp")
client.connect()
# List all subdirectories in a remote directory
folders = client.list_directories("/remote/data")
print(folders)
client.close_connection()
Source code in physical_operations_utils/sftp_utils.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
@retry(
    wait=wait_fixed(2),
    stop=stop_after_attempt(3),
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),
)
def list_directories(self, remote_path: str) -> List[str]:
    """
    Lists all subdirectories in a remote directory.

    Args:
        remote_path (str): Path to the remote directory to inspect.

    Returns:
        List[str]: A list of subdirectory names.

    Raises:
        FileNotFoundError: If the path does not exist on the remote server.

    Example:
        ```python
        from physical_operations_utils.sftp_utils import SFTPClient

        client = SFTPClient(keys_file_name="my_sftp")
        client.connect()
        # List all subdirectories in a remote directory
        folders = client.list_directories("/remote/data")
        print(folders)
        client.close_connection()
        ```
    """
    sftp_conn = self.connect()
    with sftp_conn:
        if not sftp_conn.exists(remote_path):
            raise FileNotFoundError(f"Path {remote_path} does not exist.")
        return [
            item.filename
            for item in sftp_conn.listdir_attr(remote_path)
            if stat.S_ISDIR(item.st_mode)
        ]

list_file_details(remote_path)

Returns a list of metadata dictionaries for each file/directory in a remote directory.

Parameters:

Name Type Description Default
remote_path str

The remote path to inspect.

required

Returns:

Type Description
List[dict]

List[dict]: A list of dictionaries containing file metadata.

Raises:

Type Description
FileNotFoundError

If the path does not exist on the remote server.

Example
from physical_operations_utils.sftp_utils import SFTPClient

client = SFTPClient(keys_file_name="my_sftp")
client.connect()
# List all files in a remote directory
details = client.list_file_details("/remote")
for file in details:
    print(file)
client.close_connection()
Source code in physical_operations_utils/sftp_utils.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
@retry(
    wait=wait_fixed(2),
    stop=stop_after_attempt(3),
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),
)
def list_file_details(self, remote_path: str) -> List[dict]:
    """
    Returns a list of metadata dictionaries for each file/directory in a remote directory.

    Args:
        remote_path (str): The remote path to inspect.

    Returns:
        List[dict]: A list of dictionaries containing file metadata.

    Raises:
        FileNotFoundError: If the path does not exist on the remote server.

    Example:
        ```python
        from physical_operations_utils.sftp_utils import SFTPClient

        client = SFTPClient(keys_file_name="my_sftp")
        client.connect()
        # List all files in a remote directory
        details = client.list_file_details("/remote")
        for file in details:
            print(file)
        client.close_connection()
        ```
    """
    sftp_conn = self.connect()
    with sftp_conn:
        if not sftp_conn.exists(remote_path):
            raise FileNotFoundError(f"Path {remote_path} does not exist.")
        return [
            {
                "filename": item.filename,
                "size": item.st_size,
                "permissions": stat.filemode(item.st_mode),
                "last_modified": item.st_mtime,
                "last_accessed": item.st_atime,
                "is_directory": stat.S_ISDIR(item.st_mode),
            }
            for item in sftp_conn.listdir_attr(remote_path)
        ]

list_files(path_on_sftp)

Lists the names of all files and directories in a given remote path.

Parameters:

Name Type Description Default
path_on_sftp str

Remote directory path to list contents from.

required

Returns:

Type Description
List[str]

List[str]: A list of filenames and folder names in the specified path.

Example
from physical_operations_utils.sftp_utils import SFTPClient

client = SFTPClient(keys_file_name="my_sftp")
client.connect()
# List all files and directories in a remote path
files = client.list_files("/remote")
print(files)
client.close_connection()
Source code in physical_operations_utils/sftp_utils.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
@retry(
    wait=wait_fixed(2),
    stop=stop_after_attempt(3),
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),
)
def list_files(self, path_on_sftp: str) -> List[str]:
    """
    Lists the names of all files and directories in a given remote path.

    Args:
        path_on_sftp (str): Remote directory path to list contents from.

    Returns:
        List[str]: A list of filenames and folder names in the specified path.

    Example:
        ```python
        from physical_operations_utils.sftp_utils import SFTPClient

        client = SFTPClient(keys_file_name="my_sftp")
        client.connect()
        # List all files and directories in a remote path
        files = client.list_files("/remote")
        print(files)
        client.close_connection()
        ```
    """
    sftp_conn = self.connect()
    with sftp_conn:
        files = sftp_conn.listdir(path_on_sftp)
        return files

move_file(remote_path, new_path)

Moves a file on the SFTP server.

Parameters:

Name Type Description Default
remote_path str

The current path of the file.

required
new_path str

The new path or name for the file.

required

Raises:

Type Description
FileNotFoundError

If the source file does not exist.

FileExistsError

If a file already exists at the destination path.

Example
from physical_operations_utils.sftp_utils import SFTPClient

client = SFTPClient(keys_file_name="my_sftp")
client.connect()
# Move a file on the remote server
client.move_file("/remote/old.csv", "/remote/archive/old.csv")
client.close_connection()
Source code in physical_operations_utils/sftp_utils.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
@retry(
    wait=wait_fixed(2),
    stop=stop_after_attempt(3),
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),
)
def move_file(
    self,
    remote_path: str,
    new_path: str,
) -> None:
    """
    Moves a file on the SFTP server.

    Args:
        remote_path (str): The current path of the file.
        new_path (str): The new path or name for the file.

    Raises:
        FileNotFoundError: If the source file does not exist.
        FileExistsError: If a file already exists at the destination path.

    Example:
        ```python
        from physical_operations_utils.sftp_utils import SFTPClient

        client = SFTPClient(keys_file_name="my_sftp")
        client.connect()
        # Move a file on the remote server
        client.move_file("/remote/old.csv", "/remote/archive/old.csv")
        client.close_connection()
        ```
    """
    sftp_conn = self.connect()
    with sftp_conn:
        if not sftp_conn.exists(remote_path):
            raise FileNotFoundError(f"File {remote_path} does not exist.")
        if sftp_conn.exists(new_path):
            raise FileExistsError(f"File {new_path} already exists on SFTP server.")
        sftp_conn.rename(remote_path, new_path)

read_csv_file(remote_path, **kwargs)

Reads a CSV file from the SFTP server into a pandas DataFrame.

Keyword arguments are passed directly through to pandas.read_csv. See pandas documentation for details.

Parameters:

Name Type Description Default
remote_path str

Full path to the remote CSV file.

required
**kwargs Any

Additional keyword arguments passed to pandas.read_csv.

{}

Returns:

Type Description
DataFrame

pd.DataFrame: A DataFrame containing the contents of the CSV file.

Raises:

Type Description
FileNotFoundError

If the file does not exist on the SFTP server.

ValueError

If the file extension is not '.csv'.

Example
from physical_operations_utils.sftp_utils import SFTPClient

client = SFTPClient(keys_file_name="my_sftp")
client.connect()
# Read a CSV file from the remote server
df = client.read_csv_file("/remote/data.csv", sep=",")
print(df.head())
client.close_connection()
Source code in physical_operations_utils/sftp_utils.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
@retry(
    wait=wait_fixed(2),
    stop=stop_after_attempt(3),
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),
)
def read_csv_file(self, remote_path: str, **kwargs: Any) -> pd.DataFrame:
    """
    Reads a CSV file from the SFTP server into a pandas DataFrame.

    Keyword arguments are passed directly through to `pandas.read_csv`. See pandas documentation for details.

    Args:
        remote_path (str): Full path to the remote CSV file.
        **kwargs: Additional keyword arguments passed to `pandas.read_csv`.

    Returns:
        pd.DataFrame: A DataFrame containing the contents of the CSV file.

    Raises:
        FileNotFoundError: If the file does not exist on the SFTP server.
        ValueError: If the file extension is not '.csv'.

    Example:
        ```python
        from physical_operations_utils.sftp_utils import SFTPClient

        client = SFTPClient(keys_file_name="my_sftp")
        client.connect()
        # Read a CSV file from the remote server
        df = client.read_csv_file("/remote/data.csv", sep=",")
        print(df.head())
        client.close_connection()
        ```
    """
    _, ext = os.path.splitext(remote_path.lower())
    if ext != ".csv":
        raise ValueError(f"File {remote_path} does not have a valid CSV extension.")

    sftp_conn = self.connect()
    with sftp_conn:
        if not sftp_conn.exists(remote_path):
            raise FileNotFoundError(f"File {remote_path} does not exist.")
        with sftp_conn.open(remote_path, "rb") as remote_file:
            file_data = remote_file.read()
            df = pd.read_csv(BytesIO(file_data), **kwargs)
            return df

read_excel_file(remote_path, **kwargs)

Reads an Excel file (.xls or .xlsx) from the SFTP server into a pandas DataFrame.

Keyword arguments are passed directly through to pandas.read_excel. See pandas documentation for details.

Parameters:

Name Type Description Default
remote_path str

Full path to the remote Excel file.

required
**kwargs Any

Additional keyword arguments passed to pandas.read_excel.

{}

Returns:

Type Description
DataFrame

pd.DataFrame: A DataFrame containing the contents of the Excel file.

Raises:

Type Description
FileNotFoundError

If the file does not exist.

ValueError

If the file is not an Excel file.

Example
from physical_operations_utils.sftp_utils import SFTPClient

client = SFTPClient(keys_file_name="my_sftp")
client.connect()
# Read an Excel file from the remote server
df = client.read_excel_file("/remote/data.xlsx", sheet_name="Sheet1")
print(df.head())
client.close_connection()
Source code in physical_operations_utils/sftp_utils.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
@retry(
    wait=wait_fixed(2),
    stop=stop_after_attempt(3),
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),
)
def read_excel_file(self, remote_path: str, **kwargs: Any) -> pd.DataFrame:
    """
    Reads an Excel file (.xls or .xlsx) from the SFTP server into a pandas DataFrame.

    Keyword arguments are passed directly through to `pandas.read_excel`. See pandas documentation for details.

    Args:
        remote_path (str): Full path to the remote Excel file.
        **kwargs: Additional keyword arguments passed to `pandas.read_excel`.

    Returns:
        pd.DataFrame: A DataFrame containing the contents of the Excel file.

    Raises:
        FileNotFoundError: If the file does not exist.
        ValueError: If the file is not an Excel file.

    Example:
        ```python
        from physical_operations_utils.sftp_utils import SFTPClient

        client = SFTPClient(keys_file_name="my_sftp")
        client.connect()
        # Read an Excel file from the remote server
        df = client.read_excel_file("/remote/data.xlsx", sheet_name="Sheet1")
        print(df.head())
        client.close_connection()
        ```
    """
    _, ext = os.path.splitext(remote_path.lower())
    if ext not in [".xls", ".xlsx"]:
        raise ValueError(
            f"File {remote_path} does not have a valid Excel extension."
        )

    sftp_conn = self.connect()
    with sftp_conn:
        if not sftp_conn.exists(remote_path):
            raise FileNotFoundError(f"File {remote_path} does not exist.")
        with sftp_conn.open(remote_path, "rb") as remote_file:
            file_data = remote_file.read()
            df = pd.read_excel(BytesIO(file_data), **kwargs)
            return df

remove_directory(remote_path)

Removes an empty directory from the SFTP server.

Parameters:

Name Type Description Default
remote_path str

Full remote path of the directory to remove.

required

Raises:

Type Description
FileNotFoundError

If the directory does not exist.

OSError

If the directory is not empty.

Example
from physical_operations_utils.sftp_utils import SFTPClient

client = SFTPClient(keys_file_name="my_sftp")
client.connect()
# Remove an empty directory from the remote server
client.remove_directory("/remote/old_folder")
client.close_connection()
Source code in physical_operations_utils/sftp_utils.py
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
@retry(
    wait=wait_fixed(2),
    stop=stop_after_attempt(3),
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),
)
def remove_directory(
    self,
    remote_path: str,
) -> None:
    """
    Removes an empty directory from the SFTP server.

    Args:
        remote_path (str): Full remote path of the directory to remove.

    Raises:
        FileNotFoundError: If the directory does not exist.
        OSError: If the directory is not empty.

    Example:
        ```python
        from physical_operations_utils.sftp_utils import SFTPClient

        client = SFTPClient(keys_file_name="my_sftp")
        client.connect()
        # Remove an empty directory from the remote server
        client.remove_directory("/remote/old_folder")
        client.close_connection()
        ```
    """
    sftp_conn = self.connect()
    with sftp_conn:
        if not sftp_conn.exists(remote_path):
            raise FileNotFoundError(f"Directory {remote_path} does not exist.")
        if len(sftp_conn.listdir(remote_path)) > 0:
            raise OSError(f"Directory {remote_path} is not empty.")
        sftp_conn.rmdir(remote_path)

rename_file(remote_path, new_name)

Renames a file on the SFTP server.

Parameters:

Name Type Description Default
remote_path str

Full remote path of the file to rename.

required
new_name str

New name for the file (not a full path).

required

Raises:

Type Description
FileNotFoundError

If the file does not exist.

FileExistsError

If a file with the new name already exists.

Example
from physical_operations_utils.sftp_utils import SFTPClient

client = SFTPClient(keys_file_name="my_sftp")
client.connect()
# Rename a file on the remote server
client.rename_file("/remote/report.csv", "report_2024.csv")
client.close_connection()
Source code in physical_operations_utils/sftp_utils.py
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
@retry(
    wait=wait_fixed(2),
    stop=stop_after_attempt(3),
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),
)
def rename_file(
    self,
    remote_path: str,
    new_name: str,
) -> None:
    """
    Renames a file on the SFTP server.

    Args:
        remote_path (str): Full remote path of the file to rename.
        new_name (str): New name for the file (not a full path).

    Raises:
        FileNotFoundError: If the file does not exist.
        FileExistsError: If a file with the new name already exists.

    Example:
        ```python
        from physical_operations_utils.sftp_utils import SFTPClient

        client = SFTPClient(keys_file_name="my_sftp")
        client.connect()
        # Rename a file on the remote server
        client.rename_file("/remote/report.csv", "report_2024.csv")
        client.close_connection()
        ```
    """
    sftp_conn = self.connect()
    with sftp_conn:
        if not sftp_conn.exists(remote_path):
            raise FileNotFoundError(f"File {remote_path} does not exist.")
        directory = os.path.dirname(remote_path)
        new_path = os.path.join(directory, new_name)
        if sftp_conn.exists(new_path):
            raise FileExistsError(f"File {new_path} already exists on SFTP server.")
        sftp_conn.rename(remote_path, new_path)

upload_file(local_path, remote_path, overwrite_existing=False)

Uploads a local file to the remote SFTP server.

Parameters:

Name Type Description Default
local_path str

Path to the file on the local filesystem.

required
remote_path str

Full destination path on the SFTP server.

required
overwrite_existing bool

Whether to overwrite if the file already exists remotely.

False

Raises:

Type Description
FileNotFoundError

If the local file or remote directory does not exist.

FileExistsError

If the file exists and overwrite is not allowed.

Example
from physical_operations_utils.sftp_utils import SFTPClient

client = SFTPClient(keys_file_name="my_sftp")
client.connect()
# Upload a file to the remote server
client.upload_file("./file.csv", "/remote/inbox/file.csv")
client.close_connection()
Source code in physical_operations_utils/sftp_utils.py
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
@retry(
    wait=wait_fixed(2),
    stop=stop_after_attempt(3),
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),
)
def upload_file(
    self,
    local_path: str,
    remote_path: str,
    overwrite_existing: bool = False,
) -> None:
    """
    Uploads a local file to the remote SFTP server.

    Args:
        local_path (str): Path to the file on the local filesystem.
        remote_path (str): Full destination path on the SFTP server.
        overwrite_existing (bool): Whether to overwrite if the file already exists remotely.

    Raises:
        FileNotFoundError: If the local file or remote directory does not exist.
        FileExistsError: If the file exists and overwrite is not allowed.

    Example:
        ```python
        from physical_operations_utils.sftp_utils import SFTPClient

        client = SFTPClient(keys_file_name="my_sftp")
        client.connect()
        # Upload a file to the remote server
        client.upload_file("./file.csv", "/remote/inbox/file.csv")
        client.close_connection()
        ```
    """
    sftp_conn = self.connect()
    with sftp_conn:
        if not os.path.exists(local_path):
            raise FileNotFoundError(f"Local file '{local_path}' does not exist.")
        if not sftp_conn.exists(os.path.dirname(remote_path)):
            raise FileNotFoundError(
                f"Target directory '{os.path.dirname(remote_path)}' does not exist on SFTP."
            )

        if sftp_conn.exists(remote_path) and not overwrite_existing:
            raise FileExistsError(
                f"File '{remote_path}' already exists and overwrite is disabled."
            )

        sftp_conn.put(local_path, remote_path)

write_csv_file(df, remote_path, overwrite_existing=False, **kwargs)

Writes a pandas DataFrame to a CSV file on the SFTP server.

Keyword arguments are passed directly through to pandas.to_csv. See pandas documentation for details.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to write.

required
remote_path str

Full path to the target CSV file on the remote server.

required
overwrite_existing bool

Whether to overwrite an existing file.

False
**kwargs Any

Additional arguments passed to df.to_csv().

{}

Raises:

Type Description
FileExistsError

If the file already exists and overwrite is not allowed.

FileNotFoundError

If the parent directory on the SFTP server does not exist.

ValueError

If the target path does not end with '.csv'.

Example
from physical_operations_utils.sftp_utils import SFTPClient

client = SFTPClient(keys_file_name="my_sftp")
client.connect()
# Create a DataFrame
df = pd.DataFrame({"A": [1, 2], "B": [3, 4]})
# Write the DataFrame to a CSV file on the remote server
client.write_csv_file(df, "/remote/outbox/results.csv", index=False)
client.close_connection()
Source code in physical_operations_utils/sftp_utils.py
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
@retry(
    wait=wait_fixed(2),
    stop=stop_after_attempt(3),
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),
)
def write_csv_file(
    self,
    df: pd.DataFrame,
    remote_path: str,
    overwrite_existing: bool = False,
    **kwargs: Any,
) -> None:
    """
    Writes a pandas DataFrame to a CSV file on the SFTP server.

    Keyword arguments are passed directly through to `pandas.to_csv`. See pandas documentation for details.

    Args:
        df (pd.DataFrame): The DataFrame to write.
        remote_path (str): Full path to the target CSV file on the remote server.
        overwrite_existing (bool): Whether to overwrite an existing file.
        **kwargs: Additional arguments passed to `df.to_csv()`.

    Raises:
        FileExistsError: If the file already exists and overwrite is not allowed.
        FileNotFoundError: If the parent directory on the SFTP server does not exist.
        ValueError: If the target path does not end with '.csv'.

    Example:
        ```python
        from physical_operations_utils.sftp_utils import SFTPClient

        client = SFTPClient(keys_file_name="my_sftp")
        client.connect()
        # Create a DataFrame
        df = pd.DataFrame({"A": [1, 2], "B": [3, 4]})
        # Write the DataFrame to a CSV file on the remote server
        client.write_csv_file(df, "/remote/outbox/results.csv", index=False)
        client.close_connection()
        ```
    """
    _, ext = os.path.splitext(remote_path.lower())
    if ext != ".csv":
        raise ValueError(f"Target path '{remote_path}' must end in '.csv'")

    remote_dir = os.path.dirname(remote_path)

    sftp_conn = self.connect()
    with sftp_conn:
        if not sftp_conn.exists(remote_dir):
            raise FileNotFoundError(
                f"Target directory '{remote_dir}' does not exist on SFTP."
            )

        if sftp_conn.exists(remote_path) and not overwrite_existing:
            raise FileExistsError(
                f"File '{remote_path}' already exists and overwrite is disabled."
            )

        csv_buffer = StringIO()
        df.to_csv(csv_buffer, **kwargs)
        csv_buffer.seek(0)

        with sftp_conn.open(remote_path, "w") as remote_file:
            remote_file.write(csv_buffer.read())

delete_oldest_files_from_sftp(path_on_sftp, keep_n_files, keys_file_name=None, sftp_config=None, remove_filename_independent=False, filename_starts_with=None, filename_ends_with=None, filename_contains=None)

Deletes the oldest files from an SFTP directory, keeping only the specified number of most recently modified ones.

The function supports both filename-dependent and filename-independent deletion strategies. If remove_filename_independent is True the oldest files are deleted regardless of filename patterns. If it is False, only files matching one specified filename filter (starts_with, ends_with, or contains) are considered for deletion.

Either keys_file_name for KEYS_FILE usage or sftp_config (with direct credentials) must be provided.

Parameters:

Name Type Description Default
path_on_sftp str

The remote directory path on the SFTP server.

required
keep_n_files int

The number of most recently modified files to keep.

required
keys_file_name str

Name of the key in the YAML configuration for credentials.

None
sftp_config dict

Dictionary with keys "host", "port", "username", and "secret".

None
remove_filename_independent bool

If True, deletes files without considering filename patterns.

False
filename_starts_with str

Match files whose names start with this string. Only active when remove_filename_independent is False. Filename filters are mutually exclusive.

None
filename_ends_with str

Match files whose names end with this string. Only active when remove_filename_independent is False. Filename filters are mutually exclusive.

None
filename_contains str

Match files whose names contain this string. Only active when remove_filename_independent is False. Filename filters are mutually exclusive.

None

Returns:

Name Type Description
int int

The number of files deleted.

Raises:

Type Description
ValueError

If neither credentials nor a valid filename pattern configuration is provided.

Example
from physical_operations_utils.sftp_utils import delete_oldest_files_from_sftp

# Delete oldest files that start with "log_" while keeping the 100 most recent
deleted = delete_oldest_files_from_sftp(
    path_on_sftp="/logs",
    keep_n_files=100,
    sftp_config={
        "host": "sftp.example.com",
        "port": 22,
        "username": "user",
        "secret": "sftp-secret-name-in-key-vault"
    },
    filename_starts_with="log_"
)

print(f"Deleted {deleted} old log files.")
Source code in physical_operations_utils/sftp_utils.py
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
def delete_oldest_files_from_sftp(
    path_on_sftp: str,
    keep_n_files: int,
    keys_file_name: str = None,
    sftp_config: dict = None,
    remove_filename_independent: bool = False,
    filename_starts_with: str = None,
    filename_ends_with: str = None,
    filename_contains: str = None,
) -> int:
    """
    Deletes the oldest files from an SFTP directory, keeping only the specified number of most recently modified ones.

    The function supports both filename-dependent and filename-independent deletion strategies. If `remove_filename_independent` is True the
    oldest files are deleted regardless of filename patterns. If it is False, only files matching one specified filename filter (`starts_with`,
    `ends_with`, or `contains`) are considered for deletion.

    Either `keys_file_name` for KEYS_FILE usage or `sftp_config` (with direct credentials) must be provided.

    Args:
        path_on_sftp (str): The remote directory path on the SFTP server.
        keep_n_files (int): The number of most recently modified files to keep.
        keys_file_name (str, optional): Name of the key in the YAML configuration for credentials.
        sftp_config (dict, optional): Dictionary with keys "host", "port", "username", and "secret".
        remove_filename_independent (bool): If True, deletes files without considering filename patterns.
        filename_starts_with (str, optional): Match files whose names start with this string. Only active when remove_filename_independent is False. Filename filters are mutually exclusive.
        filename_ends_with (str, optional): Match files whose names end with this string. Only active when remove_filename_independent is False. Filename filters are mutually exclusive.
        filename_contains (str, optional): Match files whose names contain this string. Only active when remove_filename_independent is False. Filename filters are mutually exclusive.

    Returns:
        int: The number of files deleted.

    Raises:
        ValueError: If neither credentials nor a valid filename pattern configuration is provided.

    Example:
        ```python
        from physical_operations_utils.sftp_utils import delete_oldest_files_from_sftp

        # Delete oldest files that start with "log_" while keeping the 100 most recent
        deleted = delete_oldest_files_from_sftp(
            path_on_sftp="/logs",
            keep_n_files=100,
            sftp_config={
                "host": "sftp.example.com",
                "port": 22,
                "username": "user",
                "secret": "sftp-secret-name-in-key-vault"
            },
            filename_starts_with="log_"
        )

        print(f"Deleted {deleted} old log files.")
        ```
    """
    if not keys_file_name and not sftp_config:
        raise ValueError("Either keys_file_name or sftp_config must be provided")

    # Validate mutually exclusive filename filters
    filters = [filename_starts_with, filename_ends_with, filename_contains]
    filters_provided = sum(f is not None for f in filters)

    if remove_filename_independent:
        if filters_provided != 0:
            raise ValueError(
                "When filename-independent deletion is used, none of filename_starts_with, filename_ends_with and filename_contains must be provided"
            )
    else:
        if filters_provided != 1:
            raise ValueError(
                "When filename-dependent deletion is used, exactly one of filename_starts_with, filename_ends_with or filename_contains must be provided"
            )

    sftp_client = (
        SFTPClient(keys_file_name=keys_file_name)
        if keys_file_name
        else SFTPClient(sftp_config=sftp_config)
    )

    file_information = sftp_client.list_file_details(remote_path=path_on_sftp)

    file_information_only_files = [
        info for info in file_information if not info["is_directory"]
    ]

    if remove_filename_independent:
        filename_pattern_matches = file_information_only_files.copy()
    else:
        filename_pattern_matches = []
        for file_info in file_information_only_files:
            file_name: str = file_info.get("filename")
            if (
                (filename_starts_with and file_name.startswith(filename_starts_with))
                or (filename_ends_with and file_name.endswith(filename_ends_with))
                or (filename_contains and filename_contains in file_name)
            ):
                filename_pattern_matches.append(file_info)
    logging.warning(
        f"Found {len(filename_pattern_matches)} files that match the pattern"
    )

    filename_pattern_matches_sorted = sorted(
        filename_pattern_matches, key=lambda x: x["last_modified"]
    )
    files_to_delete = (
        filename_pattern_matches_sorted[:-keep_n_files]
        if keep_n_files < len(filename_pattern_matches_sorted)
        else []
    )

    for file_info in files_to_delete:
        sftp_client.delete_file(remote_path=f"{path_on_sftp}/{file_info['filename']}")

    logging.warning(f"Deleted {len(files_to_delete)} files from {path_on_sftp}")
    return len(files_to_delete)