Source code for ml_cloud_tools.s3

# Copyright (c) 2021 Timothy Wolff-Piggott, Deutsche Telekom AG
# Copyright (c) 2021 Philip May, Deutsche Telekom AG
# This software is distributed under the terms of the MIT license
# which is available at https://opensource.org/licenses/MIT

"""S3 tools."""

import logging
import os
from pathlib import Path
from typing import Any, Dict, List, Optional

import boto3


_logger = logging.getLogger(__name__)


def _get_s3_bucket_name(s3_bucket_name: Optional[str] = None) -> str:
    if s3_bucket_name is None:
        s3_bucket_name = os.getenv("DEFAULT_S3_BUCKET_NAME")
    if s3_bucket_name is None:
        raise ValueError(
            "S3 bucket name must be set by parameter or the "
            "'DEFAULT_S3_BUCKET_NAME' environment variable!"
        )
    _logger.debug("Using s3_bucket_name: %s", s3_bucket_name)
    return s3_bucket_name


def _get_s3_bucket(s3_bucket_name: Optional[str] = None):
    s3_bucket_name = _get_s3_bucket_name(s3_bucket_name)
    # see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/boto3.html?highlight=resource#boto3.resource  # NOQA: E501
    s3_resource = boto3.resource("s3")
    # see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#bucket
    bucket = s3_resource.Bucket(s3_bucket_name)
    return bucket


[docs]def copy_s3_file_to_file( s3_file_name: str, local_file_name: str, s3_bucket_name: Optional[str] = None, overwrite: bool = True, s3_kwargs: Optional[Dict[str, Any]] = None, ) -> None: """Copy a file from S3 to a file on the local file system. Download the S3 file at ``s3_dir_name`` from the S3 bucket ``s3_bucket_name`` to the local file ``local_file_name``. Args: s3_file_name: Name of the so called key to download from. This is the part after the ``s3_bucket_name``. Example: ``/foo/bar/baz.txt`` local_file_name: Local path to the file to download to. Example: ``/home/my_username/baz.txt`` s3_bucket_name: S3 bucket name. Can also be provided by the ``DEFAULT_S3_BUCKET_NAME`` environment variable. One of the two must be specified. If both are specified this argument has priority. overwrite: Overwrite local file. s3_kwargs: Additional kwargs to be passed to the S3 client function :meth:`S3.Bucket.download_file`. """ s3_kwargs = {} if s3_kwargs is None else s3_kwargs if (not overwrite) and Path(local_file_name).is_file(): _logger.debug("File %s is already available. Skipping it.", local_file_name) else: _logger.debug("Copying S3 file %s to %s", s3_file_name, local_file_name) s3_bucket = _get_s3_bucket(s3_bucket_name) # see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Bucket.download_file # NOQA: E501 s3_bucket.download_file(s3_file_name, local_file_name, **s3_kwargs)
[docs]def copy_file_to_s3_file( local_file_name: str, s3_file_name: str, s3_bucket_name: Optional[str] = None, s3_kwargs: Optional[Dict[str, Any]] = None, ) -> None: """Copy a file on the local file system to a file on S3. Upload a local file ``local_file_name`` to the S3 file at ``s3_dir_name`` from the S3 bucket ``s3_bucket_name``. Args: local_file_name: Local path to the file to upload. Example: ``/home/my_username/baz.txt`` s3_file_name: Name of the so called key to upload to. This is the part after the ``s3_bucket_name``. Example: ``/foo/bar/baz.txt`` s3_bucket_name: S3 bucket name. Can also be provided by the ``DEFAULT_S3_BUCKET_NAME`` environment variable. One of the two must be specified. If both are specified this argument has priority. s3_kwargs: Additional kwargs to be passed to the S3 client function :meth:`S3.Bucket.upload_file`. """ s3_kwargs = {} if s3_kwargs is None else s3_kwargs _logger.debug("Copying %s to S3 file %s", local_file_name, s3_file_name) s3_bucket = _get_s3_bucket(s3_bucket_name) # see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Bucket.upload_file # NOQA: E501 s3_bucket.upload_file(local_file_name, s3_file_name, **s3_kwargs)
[docs]def copy_s3_dir_to_dir( s3_dir_name: str, local_dir_name: str, s3_bucket_name: Optional[str] = None, overwrite: bool = True, s3_kwargs: Optional[Dict[str, Any]] = None, ) -> str: """Copy a directory from S3 to a directory on the local file system. If you call this function with ``s3_dir_name = "a/x"`` and ``local_dir_name = "y"`` it will create a local directory ``y/x`` and copy the S3 content in ``a/x`` to that location. This way a S3 file at ``a/x/file.txt`` would be copied to ``y/x/file.txt``. Args: s3_dir_name: Name of the S3 directory. This is the part after the ``s3_bucket_name``. Example: ``/foo/bar`` local_dir_name: Name of the local directory. s3_bucket_name: S3 bucket name. Can also be provided by the ``DEFAULT_S3_BUCKET_NAME`` environment variable. One of the two must be specified. If both are specified this argument has priority. overwrite: Overwrite already existing files. s3_kwargs: Additional kwargs to be passed to the S3 client function :meth:`S3.Bucket.download_file`. Returns: Local directory where files are stored. In the example above, this would be ``y/x``. """ s3_kwargs = {} if s3_kwargs is None else s3_kwargs local_dir_path = Path(local_dir_name) if not local_dir_path.is_dir(): raise ValueError(f"'local_dir_name' must be a directory! It was: {local_dir_name}") s3_bucket = _get_s3_bucket(s3_bucket_name) final_local_dir_path = local_dir_path / Path(s3_dir_name).name for obj in s3_bucket.objects.filter(Prefix=s3_dir_name): if obj.key[-1] == "/": # TODO: this might not be needed _logger.debug("Skipping dir %s", obj.key) else: local_path = final_local_dir_path / Path(obj.key).relative_to(s3_dir_name) if (not overwrite) and local_path.is_file(): _logger.debug("File %s is already available. Skipping it.", local_path.as_posix()) else: _logger.debug("Copying S3 %s to %s", obj.key, local_path.as_posix()) local_path.parents[0].mkdir(exist_ok=True, parents=True) # see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Bucket.download_file # NOQA: E501 s3_bucket.download_file(obj.key, local_path.as_posix(), **s3_kwargs) return final_local_dir_path.as_posix()
[docs]def copy_dir_to_s3_dir( local_dir_name: str, s3_dir_name: str, s3_bucket_name: Optional[str] = None, s3_kwargs: Optional[Dict[str, Any]] = None, ) -> str: """Copy a directory from the local file system to a directory on S3. If you call this function with ``local_dir_name = "a/x"`` and ``s3_dir_name = "y"`` it will copy the content in ``a/x`` to the S3 location below ``y/x``. This way the local file at ``a/x/file.txt`` would be copied to S3 at the location ``y/x/tfile.txt``. Args: local_dir_name: Name of the local directory. s3_dir_name: Name of the S3 directory. This is the part after the ``s3_bucket_name``. Example: ``/foo/bar`` s3_bucket_name: S3 bucket name. Can also be provided by the ``DEFAULT_S3_BUCKET_NAME`` environment variable. One of the two must be specified. If both are specified this argument has priority. s3_kwargs: Additional kwargs to be passed to the S3 client function :meth:`S3.Bucket.upload_file`. Returns: S3 directory where files are stored. In the example above, this would be ``y/x``. """ s3_kwargs = {} if s3_kwargs is None else s3_kwargs local_dir_path = Path(local_dir_name) if not local_dir_path.is_dir(): raise ValueError(f"'local_dir_name' must be a directory! It was: {local_dir_name}") s3_bucket = _get_s3_bucket(s3_bucket_name) final_s3_dir_path = Path(s3_dir_name) / local_dir_path.name _logger.debug( "Uploading dir %s to S3 dir %s", local_dir_path.as_posix(), final_s3_dir_path.as_posix() ) for file_path in local_dir_path.glob("**/*"): if file_path.is_dir(): # s3 has no directories, just files with prefixes _logger.debug("Skipping dir %s", file_path.as_posix()) else: s3_file_path = final_s3_dir_path / file_path.relative_to(local_dir_path) _logger.debug("Copying %s to S3 %s", file_path.as_posix(), s3_file_path.as_posix()) # see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Bucket.upload_file # NOQA: E501 s3_bucket.upload_file(file_path.as_posix(), s3_file_path.as_posix(), **s3_kwargs) return final_s3_dir_path.as_posix()
[docs]def list_s3_files( s3_dir_name: str, s3_bucket_name: Optional[str] = None, s3_kwargs: Optional[Dict[str, Any]] = None, ) -> List[str]: """List files in S3 directory. Args: s3_dir_name: Name of the S3 directory. This is the part after the ``s3_bucket_name``. Example: ``/foo/bar`` s3_bucket_name: S3 bucket name. Can also be provided by the ``DEFAULT_S3_BUCKET_NAME`` environment variable. One of the two must be specified. If both are specified this argument has priority. s3_kwargs: Additional kwargs to be passed to the S3 client function :meth:`S3.Client.list_objects_v2`. Returns: List of files in ``s3_dir_name``. """ s3_kwargs = {} if s3_kwargs is None else s3_kwargs s3_bucket_name = _get_s3_bucket_name(s3_bucket_name) s3_client = boto3.client("s3") response = s3_client.list_objects_v2(Bucket=s3_bucket_name, Prefix=s3_dir_name, **s3_kwargs) if response["KeyCount"] == 0: _logger.warning("S3 directory is empty. s3_dir_name: %s", s3_dir_name) return [] files = [key_dict["Key"] for key_dict in response["Contents"]] while response["IsTruncated"]: _logger.debug("Got continuation token, re-listing") continuation_token = response["NextContinuationToken"] response = s3_client.list_objects_v2( Bucket=s3_bucket_name, Prefix=s3_dir_name, ContinuationToken=continuation_token, **s3_kwargs, ) files.extend(key_dict["Key"] for key_dict in response["Contents"]) return files