Skip to content

Documentation for aalibrary

Modules:

Name Description
config

Used for storing environment-specific settings such as database URIs and

conversion

This file is used to store conversion functions for the AALibrary.

egress

This file contains functions related to data egress, such as uploading

ices_ship_names

This file contains the code to parse through the ICES API found here:

ingestion

This file contains functions used to ingest Active Acoustics data into GCP

metadata

This file contains functions that have to do with the metadata DB that

queries

This script contains classes that have SQL queries used for interaction

quick_test

Contains quick tests for the API to verify that the connections are working

config

Used for storing environment-specific settings such as database URIs and such. Also contains functions for setting environment variables to use different GCP resources.

Functions:

Name Description
get_current_gcp_bucket_name

Returns the current GCP bucket name being used.

get_current_gcp_project_id

Returns the current GCP project ID being used.

use_custom_gcp_environment

Sets environment variables to use the custom GCP resources. specified by

use_gcp_dev

Sets environment variables to use GCP development resources.

use_gcp_prod

Sets environment variables to use GCP production resources.

get_current_gcp_bucket_name()

Returns the current GCP bucket name being used.

Source code in src\aalibrary\config.py
def get_current_gcp_bucket_name() -> str:
    """Returns the current GCP bucket name being used."""
    return os.getenv("AALIBRARY_GCP_BUCKET_NAME")

get_current_gcp_project_id()

Returns the current GCP project ID being used.

Source code in src\aalibrary\config.py
def get_current_gcp_project_id() -> str:
    """Returns the current GCP project ID being used."""
    return os.getenv("AALIBRARY_GCP_PROJECT_ID")

use_custom_gcp_environment(project_id, bucket_name)

Sets environment variables to use the custom GCP resources. specified by the user.

Parameters:

Name Type Description Default
project_id str

The GCP project ID to use.

required
bucket_name str

The GCP bucket name to use.

required
Source code in src\aalibrary\config.py
def use_custom_gcp_environment(project_id: str, bucket_name: str) -> None:
    """Sets environment variables to use the custom GCP resources. specified by
    the user.

    Args:
        project_id (str): The GCP project ID to use.
        bucket_name (str): The GCP bucket name to use.
    """
    os.environ["AALIBRARY_GCP_PROJECT_ID"] = project_id
    os.environ["AALIBRARY_GCP_BUCKET_NAME"] = bucket_name
    logger.debug(
        "You are now using a custom GCP environment with project ID"
        f" '{project_id}' and bucket name '{bucket_name}'."
    )

use_gcp_dev()

Sets environment variables to use GCP development resources.

Source code in src\aalibrary\config.py
def use_gcp_dev() -> None:
    """Sets environment variables to use GCP development resources."""
    os.environ["AALIBRARY_GCP_PROJECT_ID"] = GCP_DEV_PROJECT_ID
    os.environ["AALIBRARY_GCP_BUCKET_NAME"] = GCP_DEV_BUCKET_NAME
    logger.debug("You are now using the GCP Dev environment.")

use_gcp_prod()

Sets environment variables to use GCP production resources.

Source code in src\aalibrary\config.py
def use_gcp_prod() -> None:
    """Sets environment variables to use GCP production resources."""
    os.environ["AALIBRARY_GCP_PROJECT_ID"] = GCP_PROD_PROJECT_ID
    os.environ["AALIBRARY_GCP_BUCKET_NAME"] = GCP_PROD_BUCKET_NAME
    logger.debug("You are now using the GCP Prod environment.")

conversion

This file is used to store conversion functions for the AALibrary.

Functions:

Name Description
convert_local_raw_to_ices_netcdf

ENTRYPOINT FOR END-USERS

convert_local_raw_to_netcdf

ENTRYPOINT FOR END-USERS

convert_raw_to_netcdf

ENTRYPOINT FOR END-USERS

convert_raw_to_netcdf_ices

ENTRYPOINT FOR END-USERS

convert_local_raw_to_ices_netcdf(raw_file_location='', netcdf_file_download_directory='', echosounder='', delete_raw_after=False)

ENTRYPOINT FOR END-USERS Converts a local (on your computer) file from raw into netcdf using echopype.

Parameters:

Name Type Description Default
raw_file_location str

The location of the raw file. Defaults to "".

''
netcdf_file_download_directory str

The location you want to download your netcdf file to. Defaults to "".

''
echosounder str

The echosounder used. Can be one of ["EK80", "EK70"]. Defaults to "".

''
delete_raw_after bool

Whether or not to delete the raw file after conversion is complete. Defaults to False.

False
Source code in src\aalibrary\conversion.py
def convert_local_raw_to_ices_netcdf(
    raw_file_location: str = "",
    netcdf_file_download_directory: str = "",
    echosounder: str = "",
    delete_raw_after: bool = False,
):
    """ENTRYPOINT FOR END-USERS
    Converts a local (on your computer) file from raw into netcdf using
    echopype.

    Args:
        raw_file_location (str, optional): The location of the raw file.
            Defaults to "".
        netcdf_file_download_directory (str, optional): The location you want
            to download your netcdf file to. Defaults to "".
        echosounder (str, optional): The echosounder used. Can be one of
            ["EK80", "EK70"]. Defaults to "".
        delete_raw_after (bool, optional): Whether or not to delete the raw
            file after conversion is complete. Defaults to False.
    """

    netcdf_file_download_directory = os.sep.join(
        [os.path.normpath(netcdf_file_download_directory)]
    )
    print(f"netcdf_file_download_directory {netcdf_file_download_directory}")

    # Create the download directory (path) if it doesn't exist
    if not os.path.exists(netcdf_file_download_directory):
        os.makedirs(netcdf_file_download_directory)

    # Make sure the echosounder specified matches the raw file data.
    if echosounder.lower() == "ek80":
        assert sonar_checker.is_EK80(
            raw_file=raw_file_location, storage_options={}
        ), (
            f"THE ECHOSOUNDER SPECIFIED `{echosounder}` DOES NOT MATCH THE "
            "ECHOSOUNDER FOUND WITHIN THE RAW FILE."
        )
    elif echosounder.lower() == "ek60":
        assert sonar_checker.is_EK60(
            raw_file=raw_file_location, storage_options={}
        ), (
            f"THE ECHOSOUNDER SPECIFIED `{echosounder}` DOES NOT MATCH THE "
            "ECHOSOUNDER FOUND WITHIN THE RAW FILE."
        )
    else:
        print(
            f"THE ECHOSOUNDER SPECIFIED `{echosounder}` IS NOT SUPPORTED FOR "
            "ICES NETCDF CONVERSION. PLEASE USE `EK80` OR `EK60`."
        )

    try:
        print("CONVERTING RAW TO NETCDF...")
        raw_file_echopype = open_raw(
            raw_file=raw_file_location, sonar_model=echosounder
        )
        if echosounder.lower() == "ek80":
            echopype_ek80_raw_to_ices_netcdf(
                echodata=raw_file_echopype,
                export_file=netcdf_file_download_directory,
            )
        elif echosounder.lower() == "ek60":
            echopype_ek60_raw_to_ices_netcdf(
                echodata=raw_file_echopype,
                export_file=netcdf_file_download_directory,
            )
        print("CONVERTED.")
        if delete_raw_after:
            try:
                print("DELETING RAW FILE...")
                os.remove(raw_file_location)
                print("DELETED.")
            except Exception as e:
                print(e)
                print(
                    "THE RAW FILE COULD NOT BE DELETED DUE TO THE ERROR ABOVE."
                )
    except Exception as e:
        logging.error(
            "COULD NOT CONVERT `%s` DUE TO ERROR %s", raw_file_location, e
        )
        raise e

convert_local_raw_to_netcdf(raw_file_location='', netcdf_file_download_directory='', echosounder='', overwrite=False, delete_raw_after=False)

ENTRYPOINT FOR END-USERS Converts a local (on your computer) file from raw into netcdf using echopype.

Parameters:

Name Type Description Default
raw_file_location str

The location of the raw file. Defaults to "".

''
netcdf_file_download_directory str

The location you want to download your netcdf file to. Defaults to "".

''
echosounder str

The echosounder used. Can be one of ["EK80", "EK70"]. Defaults to "".

''
overwrite bool

Whether or not to overwrite the netcdf file. Defaults to False.

False
delete_raw_after bool

Whether or not to delete the raw file after conversion is complete. Defaults to False.

False
Source code in src\aalibrary\conversion.py
def convert_local_raw_to_netcdf(
    raw_file_location: str = "",
    netcdf_file_download_directory: str = "",
    echosounder: str = "",
    overwrite: bool = False,
    delete_raw_after: bool = False,
):
    """ENTRYPOINT FOR END-USERS
    Converts a local (on your computer) file from raw into netcdf using
    echopype.

    Args:
        raw_file_location (str, optional): The location of the raw file.
            Defaults to "".
        netcdf_file_download_directory (str, optional): The location you want
            to download your netcdf file to. Defaults to "".
        echosounder (str, optional): The echosounder used. Can be one of
            ["EK80", "EK70"]. Defaults to "".
        overwrite (bool, optional): Whether or not to overwrite the netcdf
            file. Defaults to False.
        delete_raw_after (bool, optional): Whether or not to delete the raw
            file after conversion is complete. Defaults to False.
    """

    netcdf_file_download_directory = os.sep.join(
        [os.path.normpath(netcdf_file_download_directory)]
    )
    print(f"netcdf_file_download_directory {netcdf_file_download_directory}")

    # Create the download directory (path) if it doesn't exist
    if not os.path.exists(netcdf_file_download_directory):
        os.makedirs(netcdf_file_download_directory)

    # Make sure the echosounder specified matches the raw file data.
    if echosounder.lower() == "ek80":
        assert sonar_checker.is_EK80(
            raw_file=raw_file_location, storage_options={}
        ), (
            f"THE ECHOSOUNDER SPECIFIED `{echosounder}` DOES NOT MATCH THE "
            "ECHOSOUNDER FOUND WITHIN THE RAW FILE."
        )
    elif echosounder.lower() == "ek60":
        assert sonar_checker.is_EK60(
            raw_file=raw_file_location, storage_options={}
        ), (
            f"THE ECHOSOUNDER SPECIFIED `{echosounder}` DOES NOT MATCH THE "
            "ECHOSOUNDER FOUND WITHIN THE RAW FILE."
        )
    elif echosounder.lower() == "azfp6":
        assert sonar_checker.is_AZFP6(
            raw_file=raw_file_location, storage_options={}
        ), (
            f"THE ECHOSOUNDER SPECIFIED `{echosounder}` DOES NOT MATCH THE "
            "ECHOSOUNDER FOUND WITHIN THE RAW FILE."
        )
    elif echosounder.lower() == "azfp":
        assert sonar_checker.is_AZFP(
            raw_file=raw_file_location, storage_options={}
        ), (
            f"THE ECHOSOUNDER SPECIFIED `{echosounder}` DOES NOT MATCH THE "
            "ECHOSOUNDER FOUND WITHIN THE RAW FILE."
        )
    elif echosounder.lower() == "ad2cp":
        assert sonar_checker.is_AD2CP(
            raw_file=raw_file_location, storage_options={}
        ), (
            f"THE ECHOSOUNDER SPECIFIED `{echosounder}` DOES NOT MATCH THE "
            "ECHOSOUNDER FOUND WITHIN THE RAW FILE."
        )
    elif echosounder.lower() == "er60":
        assert sonar_checker.is_ER60(
            raw_file=raw_file_location, storage_options={}
        ), (
            f"THE ECHOSOUNDER SPECIFIED `{echosounder}` DOES NOT MATCH THE "
            "ECHOSOUNDER FOUND WITHIN THE RAW FILE."
        )

    try:
        print("CONVERTING RAW TO NETCDF...")
        raw_file_echopype = open_raw(
            raw_file=raw_file_location, sonar_model=echosounder
        )
        raw_file_echopype.to_netcdf(
            save_path=netcdf_file_download_directory, overwrite=overwrite
        )
        print("CONVERTED.")
        if delete_raw_after:
            try:
                print("DELETING RAW FILE...")
                os.remove(raw_file_location)
                print("DELETED.")
            except Exception as e:
                print(e)
                print(
                    "THE RAW FILE COULD NOT BE DELETED DUE TO THE ERROR ABOVE."
                )
    except Exception as e:
        logging.error(
            "COULD NOT CONVERT `%s` DUE TO ERROR %s", raw_file_location, e
        )
        raise e

convert_raw_to_netcdf(file_name='', file_type='raw', ship_name='', survey_name='', echosounder='', data_source='', file_download_directory='', overwrite=False, delete_raw_after=False, gcp_bucket=None, is_metadata=False, debug=False)

ENTRYPOINT FOR END-USERS This function allows one to convert a file from raw to netcdf. Then uploads the file to GCP storage for caching.

Parameters:

Name Type Description Default
file_name str

The file name (includes extension). Defaults to "".

''
file_type str

The file type (do not include the dot "."). Defaults to "".

'raw'
ship_name str

The ship name associated with this survey. Defaults to "".

''
survey_name str

The survey name/identifier. Defaults to "".

''
echosounder str

The echosounder used to gather the data. Defaults to "".

''
data_source str

The source of the file. Necessary due to the way the storage bucket is organized. Can be one of ["NCEI", "OMAO", "HDD"]. Defaults to "".

''
file_download_directory str

The local directory you want to store your file in. Defaults to "".

''
overwrite bool

Whether or not to overwrite the netcdf file. Defaults to False.

False
delete_raw_after bool

Whether or not to delete the raw file after conversion is complete. Defaults to False.

False
gcp_bucket bucket

The GCP bucket object used to download the file. Defaults to None.

None
is_metadata bool

Whether or not the file is a metadata file. Necessary since files that are considered metadata (metadata json, or readmes) are stored in a separate directory. Defaults to False.

False
debug bool

Whether or not to print debug statements. Defaults to False.

False
Source code in src\aalibrary\conversion.py
def convert_raw_to_netcdf(
    file_name: str = "",
    file_type: str = "raw",
    ship_name: str = "",
    survey_name: str = "",
    echosounder: str = "",
    data_source: str = "",
    file_download_directory: str = "",
    overwrite: bool = False,
    delete_raw_after: bool = False,
    gcp_bucket: storage.Client.bucket = None,
    is_metadata: bool = False,
    debug: bool = False,
):
    """ENTRYPOINT FOR END-USERS
    This function allows one to convert a file from raw to netcdf. Then uploads
    the file to GCP storage for caching.

    Args:
        file_name (str, optional): The file name (includes extension).
            Defaults to "".
        file_type (str, optional): The file type (do not include the dot ".").
            Defaults to "".
        ship_name (str, optional): The ship name associated with this survey.
            Defaults to "".
        survey_name (str, optional): The survey name/identifier. Defaults
            to "".
        echosounder (str, optional): The echosounder used to gather the data.
            Defaults to "".
        data_source (str, optional): The source of the file. Necessary due to
            the way the storage bucket is organized. Can be one of
            ["NCEI", "OMAO", "HDD"]. Defaults to "".
        file_download_directory (str, optional): The local directory you want
            to store your file in. Defaults to "".
        overwrite (bool, optional): Whether or not to overwrite the netcdf
            file. Defaults to False.
        delete_raw_after (bool, optional): Whether or not to delete the raw
            file after conversion is complete. Defaults to False.
        gcp_bucket (storage.Client.bucket, optional): The GCP bucket object
            used to download the file. Defaults to None.
        is_metadata (bool, optional): Whether or not the file is a metadata
            file. Necessary since files that are considered metadata (metadata
            json, or readmes) are stored in a separate directory. Defaults to
            False.
        debug (bool, optional): Whether or not to print debug statements.
            Defaults to False.
    """
    # TODO: Implement an 'upload' param default to True.

    rf = RawFile(
        file_name=file_name,
        file_type=file_type,
        ship_name=ship_name,
        survey_name=survey_name,
        echosounder=echosounder,
        data_source=data_source,
        file_download_directory=file_download_directory,
        overwrite=overwrite,
        gcp_bucket=gcp_bucket,
        is_metadata=is_metadata,
        debug=debug,
    )

    # Here we check for a netcdf version of the raw file on GCP
    print("CHECKING FOR NETCDF VERSION ON GCP...")
    if rf.netcdf_file_exists_in_gcp:
        # Inform the user if a netcdf version exists in cache.
        download_netcdf_file(
            raw_file_name=rf.netcdf_file_name,
            file_type="netcdf",
            ship_name=rf.ship_name,
            survey_name=rf.survey_name,
            echosounder=rf.echosounder,
            data_source=rf.data_source,
            file_download_directory=rf.file_download_directory,
            gcp_bucket=rf.gcp_bucket,
            debug=rf.debug,
        )
    else:
        logging.info(
            "FILE `%s` DOES NOT EXIST AS NETCDF. DOWNLOADING/CONVERTING/"
            "UPLOADING RAW...",
            rf.raw_file_name,
        )

        # Download the raw file.
        # This function should take care of checking whether the raw file
        # exists in any of the data sources, and fetching it.
        download_raw_file(
            file_name=rf.file_name,
            ship_name=rf.ship_name,
            survey_name=rf.survey_name,
            echosounder=rf.echosounder,
            data_source=rf.data_source,
            file_download_directory=rf.file_download_directory,
            debug=rf.debug,
        )

        # Convert the raw file to netcdf.
        convert_local_raw_to_netcdf(
            raw_file_location=rf.raw_file_download_path,
            netcdf_file_download_directory=rf.file_download_directory,
            echosounder=rf.echosounder,
            overwrite=overwrite,
            delete_raw_after=delete_raw_after,
        )

        # Upload the netcdf to the correct location for parsing.
        upload_file_to_gcp_storage_bucket(
            file_name=rf.netcdf_file_name,
            file_type="netcdf",
            ship_name=rf.ship_name,
            survey_name=rf.survey_name,
            echosounder=rf.echosounder,
            file_location=rf.netcdf_file_download_path,
            gcp_bucket=rf.gcp_bucket,
            data_source=rf.data_source,
            is_metadata=False,
            debug=rf.debug,
        )
        # Upload the metadata file associated with this
        metadata.create_and_upload_metadata_df_for_netcdf(
            rf=rf,
            debug=rf.debug,
        )

convert_raw_to_netcdf_ices(file_name='', file_type='raw', ship_name='', survey_name='', echosounder='', data_source='', file_download_directory='', overwrite=False, delete_raw_after=False, gcp_bucket=None, is_metadata=False, debug=False)

ENTRYPOINT FOR END-USERS This function allows one to convert a file from raw to netcdf. Then uploads the file to GCP storage for caching.

Parameters:

Name Type Description Default
file_name str

The file name (includes extension). Defaults to "".

''
file_type str

The file type (do not include the dot "."). Defaults to "".

'raw'
ship_name str

The ship name associated with this survey. Defaults to "".

''
survey_name str

The survey name/identifier. Defaults to "".

''
echosounder str

The echosounder used to gather the data. Defaults to "".

''
data_source str

The source of the file. Necessary due to the way the storage bucket is organized. Can be one of ["NCEI", "OMAO", "HDD"]. Defaults to "".

''
file_download_directory str

The local directory you want to store your file in. Defaults to "".

''
overwrite bool

Whether or not to overwrite the netcdf file. Defaults to False.

False
delete_raw_after bool

Whether or not to delete the raw file after conversion is complete. Defaults to False.

False
gcp_bucket bucket

The GCP bucket object used to download the file. Defaults to None.

None
is_metadata bool

Whether or not the file is a metadata file. Necessary since files that are considered metadata (metadata json, or readmes) are stored in a separate directory. Defaults to False.

False
debug bool

Whether or not to print debug statements. Defaults to False.

False
Source code in src\aalibrary\conversion.py
def convert_raw_to_netcdf_ices(
    file_name: str = "",
    file_type: str = "raw",
    ship_name: str = "",
    survey_name: str = "",
    echosounder: str = "",
    data_source: str = "",
    file_download_directory: str = "",
    overwrite: bool = False,
    delete_raw_after: bool = False,
    gcp_bucket: storage.Client.bucket = None,
    is_metadata: bool = False,
    debug: bool = False,
):
    """ENTRYPOINT FOR END-USERS
    This function allows one to convert a file from raw to netcdf. Then uploads
    the file to GCP storage for caching.

    Args:
        file_name (str, optional): The file name (includes extension).
            Defaults to "".
        file_type (str, optional): The file type (do not include the dot ".").
            Defaults to "".
        ship_name (str, optional): The ship name associated with this survey.
            Defaults to "".
        survey_name (str, optional): The survey name/identifier. Defaults
            to "".
        echosounder (str, optional): The echosounder used to gather the data.
            Defaults to "".
        data_source (str, optional): The source of the file. Necessary due to
            the way the storage bucket is organized. Can be one of
            ["NCEI", "OMAO", "HDD"]. Defaults to "".
        file_download_directory (str, optional): The local directory you want
            to store your file in. Defaults to "".
        overwrite (bool, optional): Whether or not to overwrite the netcdf
            file. Defaults to False.
        delete_raw_after (bool, optional): Whether or not to delete the raw
            file after conversion is complete. Defaults to False.
        gcp_bucket (storage.Client.bucket, optional): The GCP bucket object
            used to download the file. Defaults to None.
        is_metadata (bool, optional): Whether or not the file is a metadata
            file. Necessary since files that are considered metadata (metadata
            json, or readmes) are stored in a separate directory. Defaults to
            False.
        debug (bool, optional): Whether or not to print debug statements.
            Defaults to False.
    """

    rf = RawFile(
        file_name=file_name,
        file_type=file_type,
        ship_name=ship_name,
        survey_name=survey_name,
        echosounder=echosounder,
        data_source=data_source,
        file_download_directory=file_download_directory,
        overwrite=overwrite,
        gcp_bucket=gcp_bucket,
        is_metadata=is_metadata,
        debug=debug,
    )

    # Here we check for a netcdf version of the raw file on GCP
    print("CHECKING FOR NETCDF VERSION ON GCP...")
    if rf.netcdf_file_exists_in_gcp:
        # Inform the user if a netcdf version exists in cache.
        download_netcdf_file(
            raw_file_name=rf.netcdf_file_name,
            file_type="netcdf",
            ship_name=rf.ship_name,
            survey_name=rf.survey_name,
            echosounder=rf.echosounder,
            data_source=rf.data_source,
            file_download_directory=rf.file_download_directory,
            gcp_bucket=rf.gcp_bucket,
            debug=rf.debug,
        )
    else:
        logging.info(
            "FILE `%s` DOES NOT EXIST AS NETCDF. DOWNLOADING/CONVERTING/"
            "UPLOADING RAW...",
            rf.raw_file_name,
        )

        # Download the raw file.
        # This function should take care of checking whether the raw file
        # exists in any of the data sources, and fetching it.
        download_raw_file(
            file_name=rf.file_name,
            file_type=rf.file_type,
            ship_name=rf.ship_name,
            survey_name=rf.survey_name,
            echosounder=rf.echosounder,
            data_source=rf.data_source,
            file_download_directory=rf.file_download_directory,
            debug=rf.debug,
        )

        # Convert the raw file to netcdf.
        convert_local_raw_to_ices_netcdf(
            raw_file_location=rf.raw_file_download_path,
            netcdf_file_download_directory=rf.file_download_directory,
            echosounder=rf.echosounder,
            delete_raw_after=delete_raw_after,
        )

        # Upload the netcdf to the correct location for parsing.
        upload_file_to_gcp_storage_bucket(
            file_name=rf.netcdf_file_name,
            file_type="netcdf",
            ship_name=rf.ship_name,
            survey_name=rf.survey_name,
            echosounder=rf.echosounder,
            file_location=rf.netcdf_file_download_path,
            gcp_bucket=rf.gcp_bucket,
            data_source=rf.data_source,
            is_metadata=False,
            debug=rf.debug,
        )
        # Upload the metadata file associated with this
        metadata.create_and_upload_metadata_df_for_netcdf(
            rf=rf,
            debug=rf.debug,
        )

egress

This file contains functions related to data egress, such as uploading files to cloud storage services.

Functions:

Name Description
upload_file_to_gcp_storage_bucket

Safely uploads a local file to the storage bucket. Will also check to

upload_folder_as_is_to_gcp

Uploads a local folder and its contents to a GCP storage bucket. Copies

upload_local_auxiliary_files_from_directory_to_gcp_storage_bucket

ENTRYPOINT FOR END-USERS

upload_local_calibration_files_from_directory_to_gcp_storage_bucket

ENTRYPOINT FOR END-USERS

upload_local_echosounder_files_from_directory_to_gcp_storage_bucket

ENTRYPOINT FOR END-USERS

upload_file_to_gcp_storage_bucket(file_name='', file_type='', ship_name='', survey_name='', echosounder='', file_location='', gcp_bucket=None, data_source='', is_metadata=False, is_survey_metadata=False, is_calibration_file=False, is_calibration_mapping_file=False, is_auxiliary_file=False, verbose=True, debug=False)

Safely uploads a local file to the storage bucket. Will also check to see if the file already exists.

Parameters:

Name Type Description Default
file_name str

The file name (includes extension). Defaults to "".

''
file_type str

The file type (do not include the dot "."). Defaults to "".

''
ship_name str

The ship name associated with this survey. Defaults to "".

''
survey_name str

The survey name/identifier. Defaults to "".

''
echosounder str

The echosounder used to gather the data. Defaults to "".

''
file_location str

The local location of the file. Defaults to "".

''
gcp_bucket bucket

The GCP bucket object used to upload the file. Defaults to None.

None
data_source str

The source of the data. Can be one of ["NCEI", "OMAO", "HDD", "TEST"]. Defaults to "".

''
is_metadata bool

Whether or not the file is a metadata file. Necessary since files that are considered metadata (metadata json, or readmes) are stored in a separate directory. Defaults to False.

False
is_survey_metadata bool

Whether or not the file is a metadata file associated with a survey. The files are stored at the survey level, in the metadata/ folder. Defaults to False.

False
is_calibration_file bool

Whether of not the file is a calibration file. These files are stored within their own calibration/ sub-directory within the survey.

False
is_calibration_mapping_file bool

Whether or not a file is a calibration mapping file. This is stored in a separate directory called mappings/ within the calibration/ directory.

False
is_auxiliary_file bool

Whether or not the file is an auxiliary file associated with the survey. These files can be of any extension. And do not necessarily have to be data files.

False
verbose bool

Whether or not you want to print intermediate statements about uploads. Helpful for if you are using progress bars. Defaults to True.

True
debug bool

Whether or not to print debug statements. Defaults to False.

False
Source code in src\aalibrary\egress.py
def upload_file_to_gcp_storage_bucket(
    file_name: str = "",
    file_type: str = "",
    ship_name: str = "",
    survey_name: str = "",
    echosounder: str = "",
    file_location: str = "",
    gcp_bucket: storage.Client.bucket = None,
    data_source: str = "",
    is_metadata: bool = False,
    is_survey_metadata: bool = False,
    is_calibration_file: bool = False,
    is_calibration_mapping_file: bool = False,
    is_auxiliary_file: bool = False,
    verbose: bool = True,
    debug: bool = False,
):
    """Safely uploads a local file to the storage bucket. Will also check to
    see if the file already exists.

    Args:
        file_name (str, optional): The file name (includes extension).
            Defaults to "".
        file_type (str, optional): The file type (do not include the dot ".").
            Defaults to "".
        ship_name (str, optional): The ship name associated with this survey.
            Defaults to "".
        survey_name (str, optional): The survey name/identifier. Defaults
            to "".
        echosounder (str, optional): The echosounder used to gather the data.
            Defaults to "".
        file_location (str, optional): The local location of the file.
            Defaults to "".
        gcp_bucket (storage.Client.bucket, optional): The GCP bucket object
            used to upload the file. Defaults to None.
        data_source (str, optional): The source of the data. Can be one of
            ["NCEI", "OMAO", "HDD", "TEST"]. Defaults to "".
        is_metadata (bool, optional): Whether or not the file is a metadata
            file. Necessary since files that are considered metadata (metadata
            json, or readmes) are stored in a separate directory. Defaults to
            False.
        is_survey_metadata (bool, optional): Whether or not the file is a
            metadata file associated with a survey. The files are stored at
            the survey level, in the `metadata/` folder. Defaults to False.
        is_calibration_file (bool, optional): Whether of not the file is a
            calibration file. These files are stored within their own
            `calibration/` sub-directory within the survey.
        is_calibration_mapping_file (bool, optional): Whether or not a file is
            a calibration mapping file. This is stored in a separate directory
            called `mappings/` within the `calibration/` directory.
        is_auxiliary_file (bool, optional): Whether or not the file is an
            auxiliary file associated with the survey. These files can be of
            any extension. And do not necessarily have to be data files.
        verbose (bool, optional): Whether or not you want to print intermediate
            statements about uploads. Helpful for if you are using progress
            bars.
            Defaults to True.
        debug (bool, optional): Whether or not to print debug statements.
            Defaults to False.
    """

    gcp_storage_bucket_location = (
        helpers.parse_correct_gcp_storage_bucket_location(
            file_name=file_name,
            file_type=file_type,
            ship_name=ship_name,
            survey_name=survey_name,
            echosounder=echosounder,
            data_source=data_source,
            is_metadata=is_metadata,
            is_survey_metadata=is_survey_metadata,
            is_calibration_file=is_calibration_file,
            is_calibration_mapping_file=is_calibration_mapping_file,
            is_auxiliary_file=is_auxiliary_file,
            debug=debug,
        )
    )

    # Check if the file exists in GCP
    file_exists_in_gcp = cloud_utils.check_if_file_exists_in_gcp(
        gcp_bucket, file_path=gcp_storage_bucket_location
    )
    if file_exists_in_gcp:
        print(
            (
                f"INFO: FILE `{file_name}` ALREADY EXISTS IN GCP AT "
                f"`{gcp_storage_bucket_location}`."
            )
        )
    else:
        try:
            if verbose:
                print(
                    (
                        f"UPLOADING FILE `{file_name}` TO GCP AT"
                        f" `{gcp_storage_bucket_location}`..."
                    )
                )
            # Upload to storage bucket.
            cloud_utils.upload_file_to_gcp_bucket(
                bucket=gcp_bucket,
                blob_file_path=gcp_storage_bucket_location,
                local_file_path=file_location,
                debug=debug,
            )
            if verbose:
                print("UPLOADED.")
        except Exception as e:
            logging.error(
                "COULD NOT UPLOAD FILE %s TO GCP (%s) STORAGE BUCKET DUE TO "
                "THE FOLLOWING ERROR:\n%s",
                file_name,
                gcp_storage_bucket_location,
                e,
            )

    return

upload_folder_as_is_to_gcp(local_folder_path='', gcp_bucket=None, destination_prefix='', debug=False)

Uploads a local folder and its contents to a GCP storage bucket. Copies the folder AS-IS, maintaining the folder structure. NOTE: USE WITH CAUTION. THIS WILL UPLOAD EVERYTHING IN THE FOLDER IN THE SAME MANNER AS THE FOLDER ITSELF. THIS MEANS THAT RETRIEVAL OF THE FILES MIGHT NOT BE POSSIBLE IF THE FOLDER STRUCTURE DOES NOT ADHERE TO AALIBRARY NAMING CONVENTIONS.

Parameters:

Name Type Description Default
local_folder_path str

The path to the local folder to upload.

''
gcp_bucket bucket

The GCP bucket object used to download the file. Defaults to None.

None
destination_prefix str

Where to place the folder in the storage bucket. Defaults to "".

''
debug bool

Whether or not to print debug statements. Defaults to False.

False
Source code in src\aalibrary\egress.py
def upload_folder_as_is_to_gcp(
    local_folder_path: str = "",
    gcp_bucket: storage.Client.bucket = None,
    destination_prefix: str = "",
    debug: bool = False,
):
    """Uploads a local folder and its contents to a GCP storage bucket. Copies
    the folder AS-IS, maintaining the folder structure.
    NOTE: USE WITH CAUTION. THIS WILL UPLOAD EVERYTHING IN THE FOLDER IN THE
    SAME MANNER AS THE FOLDER ITSELF. THIS MEANS THAT RETRIEVAL OF THE FILES
    MIGHT NOT BE POSSIBLE IF THE FOLDER STRUCTURE DOES NOT ADHERE TO AALIBRARY
    NAMING CONVENTIONS.

    Args:
        local_folder_path (str): The path to the local folder to upload.
        gcp_bucket (storage.Client.bucket, optional): The GCP bucket object
            used to download the file. Defaults to None.
        destination_prefix (str, optional): Where to place the folder in the
            storage bucket. Defaults to "".
        debug (bool, optional): Whether or not to print debug statements.
            Defaults to False.
    """

    # normalize the path
    local_folder_path = os.path.normpath(local_folder_path)
    # Make sure GCP bucket is setup with default values if None specified.
    if gcp_bucket is None:
        _, _, gcp_bucket = cloud_utils.setup_gcp_storage_objs(verbose=True)

    file_info = {}
    for root, _, files in os.walk(local_folder_path):
        for file_name in files:
            local_file_path = os.path.join(root, file_name)
            if os.path.isfile(local_file_path):
                file_size = os.path.getsize(local_file_path)
                file_info[local_file_path] = (file_size, file_name)
    file_info = sorted(file_info.items(), key=lambda item: item[1])
    if debug:
        pprint(file_info)

    for local_file_path, (file_size, file_name) in file_info:
        # Calculate the relative path from the local_folder_path
        relative_path = os.path.relpath(local_file_path, local_folder_path)

        # Construct the GCS blob name
        if destination_prefix:
            gcs_blob_name = os.path.join(
                destination_prefix, relative_path
            ).replace("\\", "/")
        else:
            gcs_blob_name = relative_path.replace("\\", "/")

        # Check if file already exists in GCP
        file_exists_in_gcp = cloud_utils.check_if_file_exists_in_gcp(
            bucket=gcp_bucket, file_path=gcs_blob_name
        )
        if file_exists_in_gcp:
            print(
                (
                    f"INFO: FILE `{file_name}` ALREADY EXISTS IN GCP AT"
                    f" `{gcs_blob_name}`. SKIPPING UPLOAD."
                )
            )
        else:
            blob = gcp_bucket.blob(gcs_blob_name)
            blob.upload_from_filename(local_file_path)
            print(f"Uploaded {local_file_path} to {gcs_blob_name}")

upload_local_auxiliary_files_from_directory_to_gcp_storage_bucket(local_auxiliary_directory_to_upload='', ship_name='', survey_name='', echosounder='', data_source='', gcp_bucket=None, debug=False)

ENTRYPOINT FOR END-USERS Uploads all of the files from a local auxiliary directory into the appropriate location in the GCP storage bucket. NOTE: Assumes that all files share the same metadata.

Parameters:

Name Type Description Default
local_auxiliary_directory_to_upload str

The auxiliary directory which contains all of the files you want to upload. Defaults to "".

''
ship_name str

The ship name associated with this survey. Defaults to "".

''
survey_name str

The survey name/identifier. Defaults to "".

''
echosounder str

The echosounder used to gather the data. Defaults to "".

''
data_source str

The source of the file. Necessary due to the way the storage bucket is organized. Can be one of ["NCEI", "OMAO", "HDD"]. Defaults to "".

''
gcp_bucket bucket

The GCP bucket object used to download the file. Defaults to None.

None
debug bool

Whether or not to print debug statements. Defaults to False.

False
Source code in src\aalibrary\egress.py
def upload_local_auxiliary_files_from_directory_to_gcp_storage_bucket(
    local_auxiliary_directory_to_upload: str = "",
    ship_name: str = "",
    survey_name: str = "",
    echosounder: str = "",
    data_source: str = "",
    gcp_bucket: storage.Client.bucket = None,
    debug: bool = False,
):
    """ENTRYPOINT FOR END-USERS
    Uploads all of the files from a local auxiliary directory into the
    appropriate location in the GCP storage bucket.
    NOTE: Assumes that all files share the same metadata.

    Args:
        local_auxiliary_directory_to_upload (str, optional): The auxiliary
            directory which contains all of the files you want to upload.
            Defaults to "".
        ship_name (str, optional): The ship name associated with this survey.
            Defaults to "".
        survey_name (str, optional): The survey name/identifier. Defaults
            to "".
        echosounder (str, optional): The echosounder used to gather the data.
            Defaults to "".
        data_source (str, optional): The source of the file. Necessary due to
            the way the storage bucket is organized. Can be one of
            ["NCEI", "OMAO", "HDD"]. Defaults to "".
        gcp_bucket (storage.Client.bucket, optional): The GCP bucket object
            used to download the file. Defaults to None.
        debug (bool, optional): Whether or not to print debug statements.
            Defaults to False.
    """
    # Warn user that this function assumes the same metadata for all files
    # within directory.
    logging.warning(
        (
            "WARNING: THIS FUNCTION ASSUMES THAT ALL FILES WITHIN THIS "
            "DIRECTORY ARE FROM THE SAME SHIP, SURVEY, AND ECHOSOUNDER."
        )
    )
    local_auxiliary_directory_to_upload = os.path.normpath(
        local_auxiliary_directory_to_upload
    )
    # Check that the directory exists
    check_for_assertion_errors(
        directory=local_auxiliary_directory_to_upload,
        ship_name=ship_name,
        survey_name=survey_name,
        echosounder=echosounder,
    )

    # normalize ship name
    ship_name_normalized = helpers.normalize_ship_name(ship_name)
    if debug:
        print(f"NORMALIZED SHIP NAME: {ship_name_normalized}")
        print(
            "LOCAL DIRECTORY TO UPLOAD:"
            f" {local_auxiliary_directory_to_upload}"
        )
    # Make sure GCP bucket is setup with default values if None specified.
    if gcp_bucket is None:
        _, _, gcp_bucket = cloud_utils.setup_gcp_storage_objs(verbose=True)

    # Check (glob) for raw and idx files.
    print("CHECKING DIRECTORY FOR FILES...")
    all_files = [
        x
        for x in glob.glob(
            os.sep.join([local_auxiliary_directory_to_upload, "*"])
        )
    ]
    file_upload_count = 0
    file_error_count = 0

    # Upload each file to GCP at the correct location.
    if len(all_files) > 0:
        pbar = tqdm(all_files, desc="Uploading auxiliary files")
        for file in pbar:
            try:
                file_name = file.split(os.sep)[-1]
                pbar.set_postfix_str(f"{file_name}")
                # Upload file to GCP at the correct storage bucket location.
                # The function already checks if the file exists.
                upload_file_to_gcp_storage_bucket(
                    file_name=file_name,
                    ship_name=ship_name_normalized,
                    survey_name=survey_name,
                    echosounder=echosounder,
                    file_location=file,
                    gcp_bucket=gcp_bucket,
                    data_source=data_source,
                    is_metadata=False,
                    is_survey_metadata=False,
                    is_calibration_file=False,
                    is_calibration_mapping_file=False,
                    is_auxiliary_file=True,
                    verbose=False,
                    debug=debug,
                )
                file_upload_count += 1
            except Exception as e:
                file_error_count += 1
                print(e)
    print(
        f"{file_upload_count} FILES UPLOADED WITH {file_error_count} ERRORS."
    )

upload_local_calibration_files_from_directory_to_gcp_storage_bucket(local_calibration_directory_to_upload='', ship_name='', survey_name='', echosounder='', data_source='', gcp_bucket=None, debug=False)

ENTRYPOINT FOR END-USERS Uploads all of the files from a local calibration directory into the appropriate location in the GCP storage bucket. NOTE: Assumes that all files share the same metadata.

Parameters:

Name Type Description Default
local_calibration_directory_to_upload str

The calibration directory which contains all of the files you want to upload. Defaults to "".

''
ship_name str

The ship name associated with this survey. Defaults to "".

''
survey_name str

The survey name/identifier. Defaults to "".

''
echosounder str

The echosounder used to gather the data. Defaults to "".

''
data_source str

The source of the file. Necessary due to the way the storage bucket is organized. Can be one of ["NCEI", "OMAO", "HDD"]. Defaults to "".

''
gcp_bucket bucket

The GCP bucket object used to download the file. Defaults to None.

None
debug bool

Whether or not to print debug statements. Defaults to False.

False
Source code in src\aalibrary\egress.py
def upload_local_calibration_files_from_directory_to_gcp_storage_bucket(
    local_calibration_directory_to_upload: str = "",
    ship_name: str = "",
    survey_name: str = "",
    echosounder: str = "",
    data_source: str = "",
    gcp_bucket: storage.Client.bucket = None,
    debug: bool = False,
):
    """ENTRYPOINT FOR END-USERS
    Uploads all of the files from a local calibration directory into the
    appropriate location in the GCP storage bucket.
    NOTE: Assumes that all files share the same metadata.

    Args:
        local_calibration_directory_to_upload (str, optional): The calibration
            directory which contains all of the files you want to upload.
            Defaults to "".
        ship_name (str, optional): The ship name associated with this survey.
            Defaults to "".
        survey_name (str, optional): The survey name/identifier. Defaults
            to "".
        echosounder (str, optional): The echosounder used to gather the data.
            Defaults to "".
        data_source (str, optional): The source of the file. Necessary due to
            the way the storage bucket is organized. Can be one of
            ["NCEI", "OMAO", "HDD"]. Defaults to "".
        gcp_bucket (storage.Client.bucket, optional): The GCP bucket object
            used to download the file. Defaults to None.
        debug (bool, optional): Whether or not to print debug statements.
            Defaults to False.
    """
    # Warn user that this function assumes the same metadata for all files
    # within directory.
    logging.warning(
        (
            "WARNING: THIS FUNCTION ASSUMES THAT ALL FILES WITHIN THIS "
            "DIRECTORY ARE FROM THE SAME SHIP, SURVEY, AND ECHOSOUNDER."
        )
    )
    local_calibration_directory_to_upload = os.path.normpath(
        local_calibration_directory_to_upload
    )
    # Check that the directory exists
    check_for_assertion_errors(
        directory=local_calibration_directory_to_upload,
        ship_name=ship_name,
        survey_name=survey_name,
        echosounder=echosounder,
    )

    # normalize ship name
    ship_name_normalized = helpers.normalize_ship_name(ship_name)
    if debug:
        print(f"NORMALIZED SHIP NAME: {ship_name_normalized}")
        print(
            "LOCAL DIRECTORY TO UPLOAD:"
            f" {local_calibration_directory_to_upload}"
        )
    # Make sure GCP bucket is setup with default values if None specified.
    if gcp_bucket is None:
        _, _, gcp_bucket = cloud_utils.setup_gcp_storage_objs(verbose=True)

    # Check (glob) for raw and idx files.
    print("CHECKING DIRECTORY FOR FILES...")
    all_files = [
        x
        for x in glob.glob(
            os.sep.join([local_calibration_directory_to_upload, "*"])
        )
    ]
    file_upload_count = 0
    file_error_count = 0

    # Upload each file to GCP at the correct location.
    if len(all_files) > 0:
        pbar = tqdm(all_files, desc="Uploading calibration files")
        for file in pbar:
            try:
                file_name = file.split(os.sep)[-1]
                pbar.set_postfix_str(f"{file_name}")
                # Upload file to GCP at the correct storage bucket location.
                # The function already checks if the file exists.
                upload_file_to_gcp_storage_bucket(
                    file_name=file_name,
                    ship_name=ship_name_normalized,
                    survey_name=survey_name,
                    echosounder=echosounder,
                    file_location=file,
                    gcp_bucket=gcp_bucket,
                    data_source=data_source,
                    is_metadata=False,
                    is_survey_metadata=False,
                    is_calibration_file=True,
                    is_calibration_mapping_file=False,
                    is_auxiliary_file=False,
                    verbose=False,
                    debug=debug,
                )
                file_upload_count += 1
            except Exception as e:
                file_error_count += 1
                print(e)
    print(
        f"{file_upload_count} FILES UPLOADED WITH {file_error_count} ERRORS."
    )

upload_local_echosounder_files_from_directory_to_gcp_storage_bucket(local_echosounder_directory_to_upload='', ship_name='', survey_name='', echosounder='', data_source='', gcp_bucket=None, debug=False)

ENTRYPOINT FOR END-USERS Uploads all of the .raw (and their corresponding .idx/.bot/.nc) files from a echosounder directory into the appropriate location in the GCP storage bucket. NOTE: Assumes that all files share the same metadata.

Parameters:

Name Type Description Default
local_echosounder_directory_to_upload str

The echosounder directory which contains all of the files you want to upload. Defaults to "".

''
ship_name str

The ship name associated with this survey. Defaults to "".

''
survey_name str

The survey name/identifier. Defaults to "".

''
echosounder str

The echosounder used to gather the data. Defaults to "".

''
data_source str

The source of the file. Necessary due to the way the storage bucket is organized. Can be one of ["NCEI", "OMAO", "HDD"]. Defaults to "".

''
gcp_bucket bucket

The GCP bucket object used to download the file. Defaults to None.

None
debug bool

Whether or not to print debug statements. Defaults to False.

False
Source code in src\aalibrary\egress.py
def upload_local_echosounder_files_from_directory_to_gcp_storage_bucket(
    local_echosounder_directory_to_upload: str = "",
    ship_name: str = "",
    survey_name: str = "",
    echosounder: str = "",
    data_source: str = "",
    gcp_bucket: storage.Client.bucket = None,
    debug: bool = False,
):
    """ENTRYPOINT FOR END-USERS
    Uploads all of the .raw (and their corresponding .idx/.bot/.nc) files from
    a echosounder directory into the appropriate location in the GCP storage
    bucket.
    NOTE: Assumes that all files share the same metadata.

    Args:
        local_echosounder_directory_to_upload (str, optional): The echosounder
            directory which contains all of the files you want to upload.
            Defaults to "".
        ship_name (str, optional): The ship name associated with this survey.
            Defaults to "".
        survey_name (str, optional): The survey name/identifier. Defaults
            to "".
        echosounder (str, optional): The echosounder used to gather the data.
            Defaults to "".
        data_source (str, optional): The source of the file. Necessary due to
            the way the storage bucket is organized. Can be one of
            ["NCEI", "OMAO", "HDD"]. Defaults to "".
        gcp_bucket (storage.Client.bucket, optional): The GCP bucket object
            used to download the file. Defaults to None.
        debug (bool, optional): Whether or not to print debug statements.
            Defaults to False.
    """

    # Warn user that this function assumes the same metadata for all files
    # within directory.
    logging.warning(
        (
            "WARNING: THIS FUNCTION ASSUMES THAT ALL FILES WITHIN THIS "
            "DIRECTORY ARE FROM THE SAME SHIP, SURVEY, AND ECHOSOUNDER."
        )
    )
    local_echosounder_directory_to_upload = os.path.normpath(
        local_echosounder_directory_to_upload
    )
    # Check that the directory exists
    check_for_assertion_errors(
        directory=local_echosounder_directory_to_upload,
        ship_name=ship_name,
        survey_name=survey_name,
        echosounder=echosounder,
    )

    # normalize ship name
    ship_name_normalized = helpers.normalize_ship_name(ship_name)
    if debug:
        print(f"NORMALIZED SHIP NAME: {ship_name_normalized}")
        print(
            "LOCAL DIRECTORY TO UPLOAD:"
            f" {local_echosounder_directory_to_upload}"
        )
    # Make sure GCP bucket is setup with default values if None specified.
    if gcp_bucket is None:
        _, _, gcp_bucket = cloud_utils.setup_gcp_storage_objs(verbose=True)

    # Check (glob) for raw and idx files.
    print("CHECKING DIRECTORY FOR RAW, IDX, BOT, AND NETCDF FILES...")
    raw_files = [
        x
        for x in glob.glob(
            os.sep.join([local_echosounder_directory_to_upload, "*.raw"])
        )
    ]
    idx_files = [
        x
        for x in glob.glob(
            os.sep.join([local_echosounder_directory_to_upload, "*.idx"])
        )
    ]
    bot_files = [
        x
        for x in glob.glob(
            os.sep.join([local_echosounder_directory_to_upload, "*.bot"])
        )
    ]
    netcdf_files = [
        x
        for x in glob.glob(
            os.sep.join([local_echosounder_directory_to_upload, "*.nc"])
        )
    ]
    # Create vars for use later.
    raw_upload_count = 0
    idx_upload_count = 0
    bot_upload_count = 0
    netcdf_upload_count = 0

    # Let the user know how many of each file has been found to upload.
    print(
        (
            f"FOUND {len(raw_files)} RAW FILES | {len(idx_files)} IDX FILES |"
            f" {len(bot_files)} BOT FILES | {len(netcdf_files)} NETCDF FILES"
        )
    )

    # Upload each idx file to gcp
    if len(idx_files) > 0:
        for idx_file in tqdm(idx_files, desc="Uploading idx files"):
            file_name = idx_file.split(os.sep)[-1]
            # Upload idx to GCP at the correct storage bucket location.
            # The function already checks if the file exists.
            upload_file_to_gcp_storage_bucket(
                file_name=file_name,
                file_type="idx",
                ship_name=ship_name_normalized,
                survey_name=survey_name,
                echosounder=echosounder,
                file_location=idx_file,
                gcp_bucket=gcp_bucket,
                data_source=data_source,
                is_metadata=False,
                verbose=False,
                debug=debug,
            )
            idx_upload_count += 1
        print(f"{idx_upload_count} IDX FILES UPLOADED.")

    # Upload each bot file to gcp
    if len(bot_files) > 0:
        for bot_file in tqdm(bot_files, desc="Uploading bot files"):
            file_name = bot_file.split(os.sep)[-1]
            # Upload idx to GCP at the correct storage bucket location.
            # The function already checks if the file exists.
            upload_file_to_gcp_storage_bucket(
                file_name=file_name,
                file_type="bot",
                ship_name=ship_name_normalized,
                survey_name=survey_name,
                echosounder=echosounder,
                file_location=bot_file,
                gcp_bucket=gcp_bucket,
                data_source=data_source,
                is_metadata=False,
                verbose=False,
                debug=debug,
            )
            bot_upload_count += 1
        print(f"{bot_upload_count} BOT FILES UPLOADED.")

    # Upload each raw file to gcp
    if len(raw_files) > 0:
        for raw_file in tqdm(raw_files, desc="Uploading raw files"):
            file_download_directory = os.sep.join(raw_file.split(os.sep)[:-1])
            file_name = raw_file.split(os.sep)[-1]
            # Upload raw to GCP at the correct storage bucket location.
            # The function already checks if the file exists.
            upload_file_to_gcp_storage_bucket(
                file_name=file_name,
                file_type="raw",
                ship_name=ship_name_normalized,
                survey_name=survey_name,
                echosounder=echosounder,
                file_location=raw_file,
                gcp_bucket=gcp_bucket,
                data_source=data_source,
                is_metadata=False,
                verbose=False,
                debug=debug,
            )
            metadata.create_and_upload_metadata_df_for_raw(
                rf=RawFile(
                    file_name=file_name,
                    file_type="raw",
                    ship_name=ship_name,
                    survey_name=survey_name,
                    echosounder=echosounder,
                    file_download_directory=file_download_directory,
                    data_source=data_source,
                    gcp_bucket=gcp_bucket,
                    debug=debug,
                )
            )
            raw_upload_count += 1
        print(f"{raw_upload_count} RAW FILES UPLOADED.")

    # Upload each netcdf file to gcp
    if len(netcdf_files) > 0:
        for netcdf_file in tqdm(netcdf_files, desc="Uploading netcdf files"):
            file_name = netcdf_file.split(os.sep)[-1]
            # Upload idx to GCP at the correct storage bucket location.
            # The function already checks if the file exists.
            upload_file_to_gcp_storage_bucket(
                file_name=file_name,
                file_type="netcdf",
                ship_name=ship_name_normalized,
                survey_name=survey_name,
                echosounder=echosounder,
                file_location=netcdf_file,
                gcp_bucket=gcp_bucket,
                data_source=data_source,
                is_metadata=False,
                verbose=False,
                debug=debug,
            )
            metadata.create_and_upload_metadata_df_for_netcdf(
                rf=RawFile(
                    file_name=file_name,
                    file_type="netcdf",
                    ship_name=ship_name_normalized,
                    survey_name=survey_name,
                    echosounder=echosounder,
                    data_source=data_source,
                    gcp_bucket=gcp_bucket,
                    debug=debug,
                )
            )
            netcdf_upload_count += 1
        print(f"{netcdf_upload_count} NETCDF FILES UPLOADED.")

    print(
        (
            f"UPLOADS COMPLETE\nRAW ({raw_upload_count}) | IDX "
            f"({idx_upload_count}) | BOT {bot_upload_count} | "
            f"NETCDF ({netcdf_upload_count})"
        )
    )

ices_ship_names

This file contains the code to parse through the ICES API found here: https://vocab.ices.dk/?ref=315 Specifically the SHIPC platform code which refers to ship names.

Functions:

Name Description
get_all_ices_ship_codes_and_names

Gets all of the ices ship codes and their corresponding names in a

get_all_ices_ship_names

Gets all of the ICES ship names. You can normalize them to our standards

get_all_ship_info

Gets all of the ship's info from the following URL:

get_ices_code_from_ship_name

Gets the ICES Code for a ship given a ship's name.

get_all_ices_ship_codes_and_names(normalize_ship_names=False)

Gets all of the ices ship codes and their corresponding names in a dictionary format. The keys are the ICES code, and the name is the value.

Parameters:

Name Type Description Default
normalize_ship_names bool

Whether or not to format the ship name according to our own standards. Defaults to False.

False

Returns:

Name Type Description
dict dict

A dict with all of the ICES ships. The keys are the ICES code, and the name is the value.

Source code in src\aalibrary\ices_ship_names.py
def get_all_ices_ship_codes_and_names(
    normalize_ship_names: bool = False,
) -> dict:
    """Gets all of the ices ship codes and their corresponding names in a
    dictionary format. The keys are the ICES code, and the name is the value.

    Args:
        normalize_ship_names (bool, optional): Whether or not to format the
            ship name according to our own standards. Defaults to False.

    Returns:
        dict: A dict with all of the ICES ships. The keys are the ICES code,
            and the name is the value.
    """

    all_ship_info = get_all_ship_info()
    all_ship_codes_and_names = {}
    for ship_info in all_ship_info:
        all_ship_codes_and_names[ship_info["key"]] = ship_info["description"]

    if normalize_ship_names:
        all_ship_codes_and_names = {
            code: normalize_ship_name(name)
            for code, name in all_ship_codes_and_names.items()
        }

    return all_ship_codes_and_names

get_all_ices_ship_names(normalize_ship_names=False)

Gets all of the ICES ship names. You can normalize them to our standards if you wish.

Parameters:

Name Type Description Default
normalize_ship_names bool

Whether or not to format the ship name according to our own standards. Defaults to False.

False

Returns:

Name Type Description
List List

A list containing strings of all of the ship names.

Source code in src\aalibrary\ices_ship_names.py
def get_all_ices_ship_names(normalize_ship_names: bool = False) -> List:
    """Gets all of the ICES ship names. You can normalize them to our standards
    if you wish.

    Args:
        normalize_ship_names (bool, optional): Whether or not to format the
            ship name according to our own standards. Defaults to False.

    Returns:
        List: A list containing strings of all of the ship names.
    """

    all_ship_info = get_all_ship_info()
    all_ship_names = []
    for ship_info in all_ship_info:
        # Here `ship_info` is a dict
        all_ship_names.append(ship_info["description"])
    if normalize_ship_names:
        all_ship_names = [
            normalize_ship_name(ship_name=ship_name)
            for ship_name in all_ship_names
        ]

    return all_ship_names

get_all_ship_info()

Gets all of the ship's info from the following URL: https:/vocab.ices.dk/services/api/Code/7f9a91e1-fb57-464a-8eb0-697e4b0235b5

Returns:

Name Type Description
List List

A list with dicts of all the ships, including name, ices code, uuids and other fields.

Source code in src\aalibrary\ices_ship_names.py
def get_all_ship_info() -> List:
    """Gets all of the ship's info from the following URL:
    https:/vocab.ices.dk/services/api/Code/7f9a91e1-fb57-464a-8eb0-697e4b0235b5


    Returns:
        List: A list with dicts of all the ships, including name, ices code,
            uuids and other fields.
    """

    response = requests.get(
        url=(
            "https://vocab.ices.dk/services/api/Code/"
            "7f9a91e1-fb57-464a-8eb0-697e4b0235b5"
        ),
        timeout=10
    )
    all_ship_info = response.json()

    return all_ship_info

get_ices_code_from_ship_name(ship_name='', is_normalized=False)

Gets the ICES Code for a ship given a ship's name.

Parameters:

Name Type Description Default
ship_name str

The ship name string. Defaults to "".

''
is_normalized bool

Whether or not the ship name is already normalized according to aalibrary standards. Defaults to False.

False

Returns:

Name Type Description
str str

The ICES Code if one has been found. Empty string if it has not.

Source code in src\aalibrary\ices_ship_names.py
def get_ices_code_from_ship_name(
    ship_name: str = "", is_normalized: bool = False
) -> str:
    """Gets the ICES Code for a ship given a ship's name.

    Args:
        ship_name (str, optional): The ship name string. Defaults to "".
        is_normalized (bool, optional): Whether or not the ship name is already
            normalized according to aalibrary standards. Defaults to False.

    Returns:
        str: The ICES Code if one has been found. Empty string if it has not.
    """

    # Get all of the ship codes and names.
    all_codes_and_names = get_all_ices_ship_codes_and_names(
        normalize_ship_names=is_normalized
    )
    # Reverse it to make the ship names the keys.
    all_codes_and_names = {v: k for k, v in all_codes_and_names.items()}
    valid_ices_ship_names = list(all_codes_and_names.keys())
    # Try to find the correct ICES code based on the ship name.
    try:
        return all_codes_and_names[ship_name]
    except KeyError:
        # Here the ship name does not exactly match any in the ICES DB.
        # Check for spell check using custom list
        spell_check_list = get_close_matches(
            ship_name, valid_ices_ship_names, n=3, cutoff=0.6
        )
        if len(spell_check_list) > 0:
            print(
                f"This `ship_name` {ship_name} does not"
                " exist in the ICES database. Did you mean one of the"
                f" following?\n{spell_check_list}"
            )
        else:
            print(
                f"This `ship_name` {ship_name} does not"
                " exist in the ICES database. A close match could not be "
                "found."
            )
        return ""

ingestion

This file contains functions used to ingest Active Acoustics data into GCP from various sources such as AWS buckets and Azure Data Lake.

Functions:

Name Description
download_file_from_azure_directory

Downloads a single file from an azure directory using the

download_netcdf_file

ENTRYPOINT FOR END-USERS

download_raw_file

ENTRYPOINT FOR END-USERS

download_raw_file_from_azure

ENTRYPOINT FOR END-USERS

download_raw_file_from_ncei

ENTRYPOINT FOR END-USERS

download_specific_file_from_azure

Creates a DataLakeFileClient and downloads a specific file from

download_survey_from_ncei

Downloads an entire survey from NCEI to a local directory while

find_and_upload_survey_metadata_from_s3

Finds the metadata that is associated with a particular survey in s3,

find_data_source_for_file

Finds the data source of a given filename by checking all possible data

download_file_from_azure_directory(directory_client, file_system='testcontainer', download_directory='./', file_path='')

Downloads a single file from an azure directory using the DataLakeDirectoryClient. Useful for numerous operations, as authentication is only required once for the creation of each DataLakeDirectoryClient.

Parameters:

Name Type Description Default
directory_client DataLakeDirectoryClient

The DataLakeDirectoryClient that will be used to connect to a download from an azure file system in the data lake.

required
file_system str

The file system (container) you wish to download your file from. Defaults to "testcontainer" for testing purposes.

'testcontainer'
download_directory str

The local directory you want to download to. Defaults to "./".

'./'
file_path str

The file path you want to download.

''
Source code in src\aalibrary\ingestion.py
def download_file_from_azure_directory(
    directory_client: DataLakeDirectoryClient,
    file_system: str = "testcontainer",
    download_directory: str = "./",
    file_path: str = "",
):
    """Downloads a single file from an azure directory using the
    DataLakeDirectoryClient. Useful for numerous operations, as authentication
    is only required once for the creation of each DataLakeDirectoryClient.

    Args:
        directory_client (DataLakeDirectoryClient): The
            DataLakeDirectoryClient that will be used to connect to a
            download from an azure file system in the data lake.
        file_system (str): The file system (container) you wish to download
            your file from. Defaults to "testcontainer" for testing purposes.
        download_directory (str): The local directory you want to download to.
            Defaults to "./".
        file_path (str): The file path you want to download.
    """

    # User-error-checking
    check_for_assertion_errors(
        data_lake_directory_client=directory_client,
        file_download_directory=download_directory,
    )

    file_client = directory_client.get_file_client(
        file_path=file_path, file_system=file_system
    )

    download_directory = os.path.normpath(download_directory)
    file_name = os.path.normpath(file_path).split(os.path.sep)[-1]

    with open(
        file=os.sep.join([download_directory, file_name]), mode="wb"
    ) as local_file:
        download = file_client.download_file()
        local_file.write(download.readall())
        local_file.close()

download_netcdf_file(raw_file_name='', file_type='netcdf', ship_name='', survey_name='', echosounder='', data_source='', file_download_directory='', gcp_bucket=None, debug=False)

ENTRYPOINT FOR END-USERS Downloads a netcdf file from GCP storage bucket for use on your workstation. Works as follows: 1. Checks if the exact netcdf exists in gcp. a. If it doesn't exists, prompts user to download it first. b. If it exists, downloads to the file_download_directory.

Parameters:

Name Type Description Default
raw_file_name str

The raw file name (includes extension). Defaults to "".

''
file_type str

The file type (do not include the dot "."). Defaults to "netcdf".

'netcdf'
ship_name str

The ship name associated with this survey. Defaults to "".

''
survey_name str

The survey name/identifier. Defaults to "".

''
echosounder str

The echosounder used to gather the data. Defaults to "".

''
data_source str

The source of the file. Necessary due to the way the storage bucket is organized. Can be one of ["NCEI", "OMAO", "HDD"]. Defaults to "".

''
file_download_directory str

The local directory you want to store your file in. Defaults to "".

''
gcp_bucket bucket

The GCP bucket object used to download the file. Defaults to None.

None
debug bool

Whether or not to print debug statements. Defaults to False.

False
Source code in src\aalibrary\ingestion.py
def download_netcdf_file(
    raw_file_name: str = "",
    file_type: str = "netcdf",
    ship_name: str = "",
    survey_name: str = "",
    echosounder: str = "",
    data_source: str = "",
    file_download_directory: str = "",
    gcp_bucket: storage.Client.bucket = None,
    debug: bool = False,
):
    """ENTRYPOINT FOR END-USERS
    Downloads a netcdf file from GCP storage bucket for use on your
    workstation.
    Works as follows:
        1. Checks if the exact netcdf exists in gcp.
            a. If it doesn't exists, prompts user to download it first.
            b. If it exists, downloads to the `file_download_directory`.

    Args:
        raw_file_name (str, optional): The raw file name (includes extension).
            Defaults to "".
        file_type (str, optional): The file type (do not include the dot ".").
            Defaults to "netcdf".
        ship_name (str, optional): The ship name associated with this survey.
            Defaults to "".
        survey_name (str, optional): The survey name/identifier.
            Defaults to "".
        echosounder (str, optional): The echosounder used to gather the data.
            Defaults to "".
        data_source (str, optional): The source of the file. Necessary due to
            the way the storage bucket is organized. Can be one of
            ["NCEI", "OMAO", "HDD"]. Defaults to "".
        file_download_directory (str, optional): The local directory you want
            to store your file in. Defaults to "".
        gcp_bucket (storage.Client.bucket, optional): The GCP bucket object
            used to download the file. Defaults to None.
        debug (bool, optional): Whether or not to print debug statements.
            Defaults to False.
    """

    rf = RawFile(
        file_name=raw_file_name,
        file_type=file_type,
        ship_name=ship_name,
        survey_name=survey_name,
        echosounder=echosounder,
        data_source=data_source,
        file_download_directory=file_download_directory,
        gcp_bucket=gcp_bucket,
        debug=debug,
    )

    if rf.netcdf_file_exists_in_gcp:
        print(
            (
                f"NETCDF FILE LOCATED IN GCP"
                f": `{rf.netcdf_gcp_storage_bucket_location}`\nDOWNLOADING..."
            )
        )
        utils.cloud_utils.download_file_from_gcp(
            gcp_bucket=gcp_bucket,
            blob_file_path=rf.netcdf_gcp_storage_bucket_location,
            local_file_path=rf.netcdf_file_download_path,
            debug=debug,
        )
        print(
            f"FILE `{raw_file_name}` DOWNLOADED "
            f"TO `{rf.netcdf_file_download_path}`"
        )
        return
    else:
        logging.error(
            "NETCDF FILE `%s` DOES NOT EXIST IN GCP AT THE LOCATION: `%s`.",
            raw_file_name,
            rf.netcdf_gcp_storage_bucket_location,
        )
        logging.error(
            "PLEASE CONVERT AND UPLOAD THE RAW FILE FIRST VIA"
            " `download_raw_file`."
        )
        raise FileNotFoundError

download_raw_file(file_name='', ship_name='', survey_name='', echosounder='', data_source='', file_download_directory='.', gcp_bucket=None, debug=False)

ENTRYPOINT FOR END-USERS Downloads a raw and idx file from NCEI for use on your workstation. Works as follows: 1. Checks if raw file exists in GCP. a. If it exists, checks if a netcdf version also exists or not lets the user know. i. If force_download_from_ncei is True downloads the raw and idx file from NCEI instead. b. If it doesn't exist, downloads .raw from NCEI and uploads to GCP for caching downloads .idx from NCEI and uploads to GCP for caching

Parameters:

Name Type Description Default
file_name str

The file name (includes extension). Defaults to "".

''
ship_name str

The ship name associated with this survey. Defaults to "".

''
survey_name str

The survey name/identifier. Defaults to "".

''
echosounder str

The echosounder used to gather the data. Defaults to "".

''
data_source str

The source of the file. Necessary due to the way the storage bucket is organized. Can be one of ["NCEI", "OMAO", "HDD"]. Defaults to "".

''
file_download_directory str

The local file directory you want to store your file in. Defaults to current directory. Defaults to ".".

'.'
gcp_bucket bucket

The GCP bucket object used to download the file. Defaults to None.

None
debug bool

Whether or not to print debug statements. Defaults to False.

False
Source code in src\aalibrary\ingestion.py
def download_raw_file(
    file_name: str = "",
    ship_name: str = "",
    survey_name: str = "",
    echosounder: str = "",
    data_source: str = "",
    file_download_directory: str = ".",
    gcp_bucket: storage.Bucket = None,
    debug: bool = False,
):
    """ENTRYPOINT FOR END-USERS
    Downloads a raw and idx file from NCEI for use on your workstation.
    Works as follows:
        1. Checks if raw file exists in GCP.
            a. If it exists,
                checks if a netcdf version also exists or not
                lets the user know.
                i. If `force_download_from_ncei` is True
                    downloads the raw and idx file from NCEI instead.
            b. If it doesn't exist,
                downloads .raw from NCEI and uploads to GCP for caching
                downloads .idx from NCEI and uploads to GCP for caching

    Args:
        file_name (str, optional): The file name (includes extension).
            Defaults to "".
        ship_name (str, optional): The ship name associated with this survey.
            Defaults to "".
        survey_name (str, optional): The survey name/identifier. Defaults
            to "".
        echosounder (str, optional): The echosounder used to gather the data.
            Defaults to "".
        data_source (str, optional): The source of the file. Necessary due to
            the way the storage bucket is organized. Can be one of
            ["NCEI", "OMAO", "HDD"]. Defaults to "".
        file_download_directory (str, optional): The local file directory you
            want to store your file in. Defaults to current directory.
            Defaults to ".".
        gcp_bucket (storage.Client.bucket, optional): The GCP bucket object
            used to download the file. Defaults to None.
        debug (bool, optional): Whether or not to print debug statements.
            Defaults to False.
    """

    rf = RawFile(
        file_name=file_name,
        file_type="raw",
        ship_name=ship_name,
        survey_name=survey_name,
        echosounder=echosounder,
        data_source=data_source,
        file_download_directory=file_download_directory,
        debug=debug,
        gcp_bucket=gcp_bucket,
    )

    if rf.raw_file_exists_in_gcp:
        # Inform user if file exists in GCP.
        print(
            f"INFO: FILE `{rf.raw_file_name}` ALREADY EXISTS IN"
            " GOOGLE STORAGE BUCKET."
        )
        # Here we download the raw file from GCP. We also check for a netcdf
        # version and let the user know.
        print("CHECKING FOR NETCDF VERSION...")
        if rf.netcdf_file_exists_in_gcp:
            # Inform the user if a netcdf version exists in cache.
            print(
                (
                    f"FILE `{rf.raw_file_name}` EXISTS AS A NETCDF ALREADY."
                    " PLEASE DOWNLOAD THE NETCDF VERSION IF NEEDED."
                )
            )
        else:
            print(
                (
                    f"FILE `{rf.raw_file_name}` DOES NOT EXIST AS NETCDF."
                    " CONSIDER RUNNING A CONVERSION FUNCTION"
                )
            )

        # Here we download the raw from GCP.
        print(
            (
                f"DOWNLOADING FILE `{rf.raw_file_name}` FROM GCP TO"
                f" `{rf.raw_file_download_path}`"
            )
        )
        utils.cloud_utils.download_file_from_gcp(
            gcp_bucket=rf.gcp_bucket,
            blob_file_path=rf.raw_gcp_storage_bucket_location,
            local_file_path=rf.raw_file_download_path,
            debug=rf.debug,
        )
        print("DOWNLOADED.")

    elif rf.raw_file_exists_in_ncei and (
        not rf.raw_file_exists_in_gcp
    ):  # File does not exist in gcp and needs to be downloaded from NCEI
        download_raw_file_from_ncei(
            file_name=rf.raw_file_name,
            file_type="raw",
            ship_name=rf.ship_name,
            survey_name=rf.survey_name,
            echosounder=rf.echosounder,
            file_download_directory=rf.file_download_directory,
            upload_to_gcp=True,
            debug=rf.debug,
        )

    # Checking to make sure the idx exists in GCP...
    if rf.idx_file_exists_in_gcp:
        print("CORRESPONDING IDX FILE FOUND IN GCP. DOWNLOADING...")
        # Here we download the idx from GCP.
        print(
            (
                f"DOWNLOADING FILE `{rf.idx_file_name}` FROM GCP TO "
                f"`{rf.idx_file_download_path}`"
            )
        )
        utils.cloud_utils.download_file_from_gcp(
            gcp_bucket=rf.gcp_bucket,
            blob_file_path=rf.idx_gcp_storage_bucket_location,
            local_file_path=rf.idx_file_download_path,
            debug=rf.debug,
        )
        print("DOWNLOADED.")
    elif rf.idx_file_exists_in_ncei and (not rf.idx_file_exists_in_gcp):
        print(
            (
                "CORRESPONDING IDX FILE NOT FOUND IN GCP."
                " DOWNLOADING FROM NCEI AND UPLOADING TO GCP..."
            )
        )
        # Safely download and upload the idx file.
        download_single_file_from_aws(
            file_url=rf.idx_file_ncei_url,
            download_location=rf.idx_file_download_path,
        )
        # Upload to GCP at the correct storage bucket location.
        upload_file_to_gcp_storage_bucket(
            file_name=rf.idx_file_name,
            file_type="idx",
            ship_name=rf.ship_name,
            survey_name=rf.survey_name,
            echosounder=rf.echosounder,
            file_location=rf.idx_file_download_path,
            gcp_bucket=rf.gcp_bucket,
            data_source=rf.data_source,
            debug=rf.debug,
        )

    # Checking to make sure the bot exists in GCP...
    if rf.bot_file_exists_in_gcp:
        print("CORRESPONDING BOT FILE FOUND IN GCP. DOWNLOADING...")
        # Here we download the bot from GCP.
        print(
            (
                f"DOWNLOADING FILE `{rf.bot_file_name}` FROM GCP"
                f" TO `{rf.bot_file_download_path}`"
            )
        )
        utils.cloud_utils.download_file_from_gcp(
            gcp_bucket=rf.gcp_bucket,
            blob_file_path=rf.bot_gcp_storage_bucket_location,
            local_file_path=rf.bot_file_download_path,
            debug=rf.debug,
        )
        print("DOWNLOADED.")
    elif rf.bot_file_exists_in_ncei and (not rf.bot_file_exists_in_gcp):
        print(
            (
                "CORRESPONDING BOT FILE NOT FOUND IN GCP. TRYING TO "
                "DOWNLOAD FROM NCEI AND UPLOADING TO GCP..."
            )
        )
        # Safely download and upload the bot file.
        download_single_file_from_aws(
            file_url=rf.bot_file_ncei_url,
            download_location=rf.bot_file_download_path,
        )
        # Upload to GCP at the correct storage bucket location.
        upload_file_to_gcp_storage_bucket(
            file_name=rf.bot_file_name,
            file_type="bot",
            ship_name=rf.ship_name,
            survey_name=rf.survey_name,
            echosounder=rf.echosounder,
            file_location=rf.bot_file_download_path,
            gcp_bucket=rf.gcp_bucket,
            data_source=rf.data_source,
            debug=rf.debug,
        )

    return

download_raw_file_from_azure(file_name='', file_type='raw', ship_name='', survey_name='', echosounder='', file_download_directory='.', config_file_path='', upload_to_gcp=False, gcp_bucket=None, debug=False)

ENTRYPOINT FOR END-USERS Downloads a raw, idx, and bot file from OMAO. If upload_to_gcp is enabled, the downloaded files will also upload to the GCP storage bucket if they do not exist.

Parameters:

Name Type Description Default
file_name str

The file name (includes extension). Defaults to "".

''
file_type str

The file type (do not include the dot "."). Defaults to "".

'raw'
ship_name str

The ship name associated with this survey. Defaults to "".

''
survey_name str

The survey name/identifier. Defaults to "".

''
echosounder str

The echosounder used to gather the data. Defaults to "".

''
file_download_directory str

The local directory you want to store your file in. Defaults to current directory. Defaults to ".".

'.'
config_file_path str

The location of the config file. Needs a [DEFAULT] section with a azure_connection_string variable defined. Defaults to "".

''
upload_to_gcp bool

Whether or not you want to upload to GCP. Defaults to False.

False
gcp_bucket bucket

The GCP bucket object used to download the file. Defaults to None.

None
debug bool

Whether or not to print debug statements. Defaults to False.

False
Source code in src\aalibrary\ingestion.py
def download_raw_file_from_azure(
    file_name: str = "",
    file_type: str = "raw",
    ship_name: str = "",
    survey_name: str = "",
    echosounder: str = "",
    file_download_directory: str = ".",
    config_file_path: str = "",
    upload_to_gcp: bool = False,
    gcp_bucket: storage.Client.bucket = None,
    debug: bool = False,
):
    """ENTRYPOINT FOR END-USERS
    Downloads a raw, idx, and bot file from OMAO. If `upload_to_gcp` is
    enabled, the downloaded files will also upload to the GCP storage bucket
    if they do not exist.

    Args:
        file_name (str, optional): The file name (includes extension).
            Defaults to "".
        file_type (str, optional): The file type (do not include the dot ".").
            Defaults to "".
        ship_name (str, optional): The ship name associated with this survey.
            Defaults to "".
        survey_name (str, optional): The survey name/identifier.
            Defaults to "".
        echosounder (str, optional): The echosounder used to gather the data.
            Defaults to "".
        file_download_directory (str, optional): The local directory you want
            to store your file in. Defaults to current directory. Defaults
            to ".".
        config_file_path (str, optional): The location of the config file.
            Needs a `[DEFAULT]` section with a `azure_connection_string`
            variable defined. Defaults to "".
        upload_to_gcp (bool, optional): Whether or not you want to upload to
            GCP. Defaults to False.
        gcp_bucket (storage.Client.bucket, optional): The GCP bucket object
            used to download the file. Defaults to None.
        debug (bool, optional): Whether or not to print debug statements.
            Defaults to False.
    """

    rf = RawFile(
        file_name=file_name,
        file_type=file_type,
        ship_name=ship_name,
        survey_name=survey_name,
        echosounder=echosounder,
        data_source="OMAO",
        file_download_directory=file_download_directory,
        is_metadata=False,
        upload_to_gcp=upload_to_gcp,
        debug=debug,
        gcp_bucket=gcp_bucket,
    )

    # Location of temporary file in sandbox environment.
    # https://contracttest4.blob.core.windows.net/testcontainer/Reuben_Lasker/RL_1601/EK_60/1601RL-D20160107-T074016.bot

    # Create Azure Directory Client
    azure_datalake_directory_client = get_data_lake_directory_client(
        config_file_path=config_file_path
    )

    if rf.raw_file_exists_in_omao:
        print(f"DOWNLOADING FILE {rf.raw_file_name} FROM OMAO")
        download_file_from_azure_directory(
            directory_client=azure_datalake_directory_client,
            download_directory=rf.file_download_directory,
            file_path=rf.raw_omao_file_path,
        )
    if rf.idx_file_exists_in_omao:
        # Force download the idx file.
        print(f"DOWNLOADING IDX FILE {rf.idx_file_name} FROM OMAO")
        download_file_from_azure_directory(
            directory_client=azure_datalake_directory_client,
            download_directory=rf.file_download_directory,
            file_path=rf.idx_omao_file_path,
        )
    if rf.bot_file_exists_in_omao:
        # Force download the bot file.
        print(f"DOWNLOADING BOT FILE {rf.bot_file_name} FROM OMAO")
        download_file_from_azure_directory(
            directory_client=azure_datalake_directory_client,
            download_directory=rf.file_download_directory,
            file_path=rf.bot_omao_file_path,
        )

    if upload_to_gcp:
        if rf.raw_file_exists_in_gcp:
            print(
                (
                    "INFO: RAW FILE ALREADY EXISTS IN GCP AT "
                    f"`{rf.raw_gcp_storage_bucket_location}`"
                )
            )
        else:
            # TODO: try out a background process if possible -- file might
            # have a lock. only async options, otherwise subprocess gsutil to
            # upload it.
            # Upload raw to GCP at the correct storage bucket location.
            upload_file_to_gcp_storage_bucket(
                file_name=rf.file_name,
                file_type="raw",
                ship_name=rf.ship_name,
                survey_name=rf.survey_name,
                echosounder=rf.echosounder,
                file_location=rf.raw_file_download_path,
                gcp_bucket=rf.gcp_bucket,
                data_source=rf.data_source,
                debug=debug,
            )
            # Upload the raw metadata file as well.
            metadata.create_and_upload_metadata_df_for_raw(
                rf=rf,
                debug=debug,
            )

        if rf.idx_file_exists_in_gcp:
            print(
                (
                    "INFO: IDX FILE ALREADY EXISTS IN GCP AT "
                    f"`{rf.idx_gcp_storage_bucket_location}`"
                )
            )
        elif rf.idx_file_exists_in_omao and (
            not rf.idx_file_exists_in_gcp
        ):  # Upload idx to GCP at the correct storage bucket location.
            upload_file_to_gcp_storage_bucket(
                file_name=rf.idx_file_name,
                file_type="idx",
                ship_name=rf.ship_name,
                survey_name=rf.survey_name,
                echosounder=rf.echosounder,
                file_location=rf.idx_file_download_path,
                gcp_bucket=rf.gcp_bucket,
                data_source=rf.data_source,
                is_metadata=False,
                debug=debug,
            )

        if rf.bot_file_exists_in_gcp:
            print(
                (
                    "INFO: BOT FILE ALREADY EXISTS IN GCP AT"
                    f" `{rf.bot_gcp_storage_bucket_location}`"
                )
            )
        elif rf.bot_file_exists_in_ncei and (
            not rf.bot_file_exists_in_gcp
        ):  # Upload bot to GCP at the correct storage bucket location.
            upload_file_to_gcp_storage_bucket(
                file_name=rf.bot_file_name,
                file_type="bot",
                ship_name=rf.ship_name,
                survey_name=rf.survey_name,
                echosounder=rf.echosounder,
                file_location=rf.bot_file_download_path,
                gcp_bucket=rf.gcp_bucket,
                data_source=rf.data_source,
                is_metadata=False,
                debug=debug,
            )

        return

download_raw_file_from_ncei(file_name='', file_type='raw', ship_name='', survey_name='', echosounder='', file_download_directory='.', upload_to_gcp=False, gcp_bucket=None, debug=False)

ENTRYPOINT FOR END-USERS Downloads a raw, idx, and bot file from NCEI. If upload_to_gcp is enabled, the downloaded files will also upload to the GCP storage bucket if they do not exist.

Parameters:

Name Type Description Default
file_name str

The file name (includes extension). Defaults to "".

''
file_type str

The file type (do not include the dot "."). Defaults to "".

'raw'
ship_name str

The ship name associated with this survey. Defaults to "".

''
survey_name str

The survey name/identifier. Defaults to "".

''
echosounder str

The echosounder used to gather the data. Defaults to "".

''
file_download_directory str

The local file directory you want to store your file in. Defaults to current directory. Defaults to ".".

'.'
upload_to_gcp bool

Whether or not you want to upload to GCP. Defaults to False.

False
gcp_bucket bucket

The GCP bucket object used to download the file. Defaults to None.

None
debug bool

Whether or not to print debug statements. Defaults to False.

False
Source code in src\aalibrary\ingestion.py
def download_raw_file_from_ncei(
    file_name: str = "",
    file_type: str = "raw",
    ship_name: str = "",
    survey_name: str = "",
    echosounder: str = "",
    file_download_directory: str = ".",
    upload_to_gcp: bool = False,
    gcp_bucket: storage.Client.bucket = None,
    debug: bool = False,
):
    """ENTRYPOINT FOR END-USERS
    Downloads a raw, idx, and bot file from NCEI. If `upload_to_gcp` is
    enabled, the downloaded files will also upload to the GCP storage bucket
    if they do not exist.

    Args:
        file_name (str, optional): The file name (includes extension).
            Defaults to "".
        file_type (str, optional): The file type (do not include the dot ".").
            Defaults to "".
        ship_name (str, optional): The ship name associated with this survey.
            Defaults to "".
        survey_name (str, optional): The survey name/identifier.
            Defaults to "".
        echosounder (str, optional): The echosounder used to gather the data.
            Defaults to "".
        file_download_directory (str, optional): The local file directory you
            want to store your file in. Defaults to current directory.
            Defaults to ".".
        upload_to_gcp (bool, optional): Whether or not you want to upload to
            GCP. Defaults to False.
        gcp_bucket (storage.Client.bucket, optional): The GCP bucket object
            used to download the file. Defaults to None.
        debug (bool, optional): Whether or not to print debug statements.
            Defaults to False.
    """

    rf = RawFile(
        file_name=file_name,
        file_type=file_type,
        ship_name=ship_name,
        survey_name=survey_name,
        echosounder=echosounder,
        data_source="NCEI",
        file_download_directory=file_download_directory,
        upload_to_gcp=upload_to_gcp,
        gcp_bucket=gcp_bucket,
        debug=debug,
    )

    if rf.raw_file_exists_in_ncei:
        download_single_file_from_aws(
            file_url=rf.raw_file_ncei_url,
            download_location=rf.raw_file_download_path,
        )
    if rf.idx_file_exists_in_ncei:
        # Force download the idx file.
        download_single_file_from_aws(
            file_url=rf.idx_file_ncei_url,
            download_location=rf.idx_file_download_path,
        )
    if rf.bot_file_exists_in_ncei:
        # Force download the bot file.
        download_single_file_from_aws(
            file_url=rf.bot_file_ncei_url,
            download_location=rf.bot_file_download_path,
        )

    if upload_to_gcp:
        if rf.raw_file_exists_in_gcp:
            print(
                (
                    "INFO: RAW FILE ALREADY EXISTS IN GCP AT "
                    f"`{rf.raw_gcp_storage_bucket_location}`"
                )
            )
        else:
            # TODO: try out a background process if possible -- file might
            # have a lock. only async options, otherwise subprocess gsutil to
            # upload it.

            # Upload raw to GCP at the correct storage bucket location.
            upload_file_to_gcp_storage_bucket(
                file_name=rf.file_name,
                file_type="raw",
                ship_name=rf.ship_name,
                survey_name=rf.survey_name,
                echosounder=rf.echosounder,
                file_location=rf.raw_file_download_path,
                gcp_bucket=rf.gcp_bucket,
                data_source=rf.data_source,
                debug=rf.debug,
            )
            # Upload the raw metadata file as well.
            metadata.create_and_upload_metadata_df_for_raw(
                rf=rf,
                debug=rf.debug,
            )

        if rf.idx_file_exists_in_gcp:
            print(
                (
                    "INFO: IDX FILE ALREADY EXISTS IN GCP AT "
                    f"`{rf.idx_gcp_storage_bucket_location}`"
                )
            )
        elif rf.idx_file_exists_in_ncei and (not rf.idx_file_exists_in_gcp):
            # Upload idx to GCP at the correct storage bucket location.
            upload_file_to_gcp_storage_bucket(
                file_name=rf.idx_file_name,
                file_type="idx",
                ship_name=rf.ship_name,
                survey_name=rf.survey_name,
                echosounder=echosounder,
                file_location=rf.idx_file_download_path,
                gcp_bucket=rf.gcp_bucket,
                data_source=rf.data_source,
                is_metadata=False,
                debug=rf.debug,
            )

        if rf.bot_file_exists_in_gcp:
            print(
                (
                    "INFO: BOT FILE ALREADY EXISTS IN GCP AT "
                    f"`{rf.bot_gcp_storage_bucket_location}`"
                )
            )
        elif rf.bot_file_exists_in_ncei and (not rf.bot_file_exists_in_gcp):
            # Upload bot to GCP at the correct storage bucket location.
            upload_file_to_gcp_storage_bucket(
                file_name=rf.bot_file_name,
                file_type="bot",
                ship_name=rf.ship_name,
                survey_name=rf.survey_name,
                echosounder=rf.echosounder,
                file_location=rf.bot_file_download_path,
                gcp_bucket=rf.gcp_bucket,
                data_source=rf.data_source,
                is_metadata=False,
                debug=rf.debug,
            )

        return

download_specific_file_from_azure(config_file_path='', container_name='testcontainer', file_path_in_container='')

Creates a DataLakeFileClient and downloads a specific file from container_name.

Parameters:

Name Type Description Default
config_file_path str

The location of the config file. Needs a [DEFAULT] section with a azure_connection_string variable defined. Defaults to "".

''
container_name str

The container within Azure Data Lake you are trying to access. Defaults to "testcontainer".

'testcontainer'
file_path_in_container str

The file path of the file you would like downloaded. Defaults to "".

''
Source code in src\aalibrary\ingestion.py
def download_specific_file_from_azure(
    config_file_path: str = "",
    container_name: str = "testcontainer",
    file_path_in_container: str = "",
):
    """Creates a DataLakeFileClient and downloads a specific file from
    `container_name`.

    Args:
        config_file_path (str, optional): The location of the config file.
            Needs a `[DEFAULT]` section with a `azure_connection_string`
            variable defined. Defaults to "".
        container_name (str, optional): The container within Azure Data Lake
            you are trying to access. Defaults to "testcontainer".
        file_path_in_container (str, optional): The file path of the file you
            would like downloaded. Defaults to "".
    """

    conf = configparser.ConfigParser()
    conf.read(config_file_path)

    file = DataLakeFileClient.from_connection_string(
        conf["DEFAULT"]["azure_connection_string"],
        file_system_name=container_name,
        file_path=file_path_in_container,
    )

    file_name = file_path_in_container.split("/")[-1]

    with open(f"./{file_name}", "wb") as my_file:
        download = file.download_file()
        download.readinto(my_file)

download_survey_from_ncei(ship_name='', survey_name='', download_directory='', max_limit=None, debug=False)

Downloads an entire survey from NCEI to a local directory while maintaining folder structure.

Parameters:

Name Type Description Default
ship_name str

The ship name. Defaults to "".

''
survey_name str

The name of the survey you would like to download. Defaults to "".

''
download_directory str

The directory to which the files will be downloaded. Creates a directory in the cwd if not specified. Defaults to "". NOTE: The directory specified will have the ship_name/survey_name folders created within it.

''
max_limit int

The maximum number of random files to download. Defaults to include all files.

None
debug bool

Whether or not you want to print debug statements. Defaults to False.

False
Source code in src\aalibrary\ingestion.py
def download_survey_from_ncei(
    ship_name: str = "",
    survey_name: str = "",
    download_directory: str = "",
    max_limit: int = None,
    debug: bool = False,
):
    """Downloads an entire survey from NCEI to a local directory while
    maintaining folder structure.

    Args:
        ship_name (str, optional): The ship name. Defaults to "".
        survey_name (str, optional): The name of the survey you would like to
            download. Defaults to "".
        download_directory (str, optional): The directory to which the files
            will be downloaded. Creates a directory in the cwd if not
            specified. Defaults to "".
            NOTE: The directory specified will have the `ship_name/survey_name`
            folders created within it.
        max_limit (int, optional): The maximum number of random files to
            download.
            Defaults to include all files.
        debug (bool, optional): Whether or not you want to print debug
            statements. Defaults to False.
    """

    # User-error-checking
    # Normalize ship name to NCEI format
    if ship_name:
        ship_name = utils.ncei_utils.get_closest_ncei_formatted_ship_name(
            ship_name
        )

    if download_directory == "":
        # Create a directory in the cwd
        download_directory = os.sep.join(
            [os.path.normpath("./"), f"{ship_name}", f"{survey_name}"]
        )
    else:
        download_directory = os.sep.join(
            [
                os.path.normpath(download_directory),
                f"{ship_name}",
                f"{survey_name}",
            ]
        )
    # normalize the path
    download_directory = os.path.normpath(download_directory)

    # Create the directory if it doesn't exist.
    if not os.path.isdir(download_directory):
        os.makedirs(download_directory, exist_ok=True)
    print("CREATED DOWNLOAD DIRECTORY.")

    if debug:
        print(f"FORMATTED DOWNLOAD DIRECTORY: {download_directory}")

    # Get all s3 objects for the survey
    print(f"GETTING ALL S3 OBJECTS FOR SURVEY {survey_name}...", end="")
    _, s3_resource, _ = utils.cloud_utils.create_s3_objs()
    s3_objects = cloud_utils.list_all_objects_in_s3_bucket_location(
        prefix=f"data/raw/{ship_name}/{survey_name}/",
        s3_resource=s3_resource,
        return_full_paths=True,
    )
    print(f"FOUND {len(s3_objects)} FILES.")

    # Set the max limit if not specified or if greater than the number of
    # files.
    if max_limit is None or max_limit > len(s3_objects):
        max_limit = len(s3_objects)

    # Create all the subdirectories first
    print("CREATING SUBDIRECTORIES...", end="")
    subdirs = set()
    # Get the subfolders from object keys
    for s3_object in s3_objects:
        # Skip folders
        if s3_object.endswith("/"):
            continue
        # Get the subfolder structure from the object key
        subfolder_key = os.sep.join(
            s3_object.replace(
                f"data/raw/{ship_name}/{survey_name}/", ""
            ).split("/")[:-1]
        )
        subdirs.add(subfolder_key)
    for subdir in subdirs:
        os.makedirs(os.sep.join([download_directory, subdir]), exist_ok=True)
    print("SUBDIRECTORIES CREATED.")

    for _, object_key in enumerate(
        tqdm(s3_objects[:max_limit], desc="Downloading")
    ):
        # file_name = object_key.split("/")[-1]
        local_object_path = object_key.replace(
            f"data/raw/{ship_name}/{survey_name}/", ""
        )
        download_location = os.path.normpath(
            os.sep.join([download_directory, local_object_path])
        )
        download_single_file_from_aws(
            file_url=object_key, download_location=download_location
        )
    print(f"DOWNLOAD COMPLETE {os.path.abspath(download_directory)}.")

find_and_upload_survey_metadata_from_s3(ship_name='', survey_name='', gcp_bucket=None, debug=False)

Finds the metadata that is associated with a particular survey in s3, then uploads all of those files into the correct gcp location.

Parameters:

Name Type Description Default
ship_name str

The ship name associated with this survey. Defaults to "".

''
survey_name str

The survey name/identifier. Defaults to "".

''
gcp_bucket bucket

The GCP bucket object used to download the file. Defaults to None.

None
debug bool

Whether or not to print debug statements. Defaults to False.

False
Source code in src\aalibrary\ingestion.py
def find_and_upload_survey_metadata_from_s3(
    ship_name: str = "",
    survey_name: str = "",
    gcp_bucket: storage.Client.bucket = None,
    debug: bool = False,
):
    """Finds the metadata that is associated with a particular survey in s3,
    then uploads all of those files into the correct gcp location.

    Args:
        ship_name (str, optional): The ship name associated with this survey.
            Defaults to "".
        survey_name (str, optional): The survey name/identifier. Defaults
            to "".
        gcp_bucket (storage.Client.bucket, optional): The GCP bucket object
            used to download the file. Defaults to None.
        debug (bool, optional): Whether or not to print debug statements.
            Defaults to False.
    """

    metadata_location_in_s3 = f"data/raw/{ship_name}/{survey_name}/metadata/"

    try:
        _, _, s3_bucket = utils.cloud_utils.create_s3_objs()
    except Exception as e:
        logging.error("CANNOT ESTABLISH CONNECTION TO S3 BUCKET..\n%s", e)
        raise

    num_metadata_objects = cloud_utils.count_objects_in_s3_bucket_location(
        prefix=metadata_location_in_s3, bucket=s3_bucket
    )

    if debug:
        logging.debug(
            "%d num_metadata_objects FOUND IN S3 FOR %s - %s",
            num_metadata_objects,
            ship_name,
            survey_name,
        )

    if num_metadata_objects >= 1:
        # Get object keys
        s3_objects = cloud_utils.list_all_objects_in_s3_bucket_location(
            prefix=metadata_location_in_s3, s3_resource=s3_bucket
        )
        # Download and upload each object
        for full_path, file_name in s3_objects:
            # Get the correct full file download location
            file_download_directory = os.sep.join(
                [os.path.normpath("./"), file_name]
            )
            # Download from aws
            download_single_file_from_aws(
                file_url=full_path, download_location=file_download_directory
            )
            # Upload to gcp
            upload_file_to_gcp_storage_bucket(
                file_name=file_name,
                ship_name=ship_name,
                survey_name=survey_name,
                file_location=file_download_directory,
                gcp_bucket=gcp_bucket,
                data_source="NCEI",
                is_metadata=False,
                is_survey_metadata=True,
                debug=debug,
            )
            # Remove local file (it's temporary)
            os.remove(file_download_directory)

find_data_source_for_file()

Finds the data source of a given filename by checking all possible data sources.

Source code in src\aalibrary\ingestion.py
def find_data_source_for_file():
    """Finds the data source of a given filename by checking all possible data
    sources."""

metadata

This file contains functions that have to do with the metadata DB that resides in BigQuery.

Functions:

Name Description
create_and_upload_metadata_df_for_netcdf

Creates a metadata file with appropriate information for netcdf files.

create_and_upload_metadata_df_for_raw

Creates a metadata file with appropriate information. Then uploads it

create_metadata_json_for_netcdf_files

Creates a JSON object containing metadata for the current user.

create_metadata_json_for_raw_files

Creates a JSON object containing metadata for the current user.

delay_file_deletion

Delays a file's DELETION_DATETIME by the number of days specified.

get_current_gcp_user_email

Gets the current gcloud user's email.

get_deletion_datetime_of_file

Gets the DELETION_DATETIME of a file. Returns a datetime object.

get_metadata_in_df_format

Retrieves the metadata associated with all objects in GCP in DataFrame

upload_ncei_metadata_df_to_bigquery

Finds the metadata obtained from a survey on NCEI, and uploads it to the

create_and_upload_metadata_df_for_netcdf(rf=None, debug=False)

Creates a metadata file with appropriate information for netcdf files. Then uploads it to the correct table in GCP.

Parameters:

Name Type Description Default
rf RawFile

The RawFile object associated with this file. Defaults to None.

None
debug bool

Whether or not to print debug statements. Defaults to False.

False
Source code in src\aalibrary\metadata.py
def create_and_upload_metadata_df_for_netcdf(
    rf: RawFile = None,
    debug: bool = False,
):
    """Creates a metadata file with appropriate information for netcdf files.
    Then uploads it to the correct table in GCP.

    Args:
        rf (RawFile, optional): The RawFile object associated with this file.
            Defaults to None.
        debug (bool, optional): Whether or not to print debug statements.
            Defaults to False.
    """

    metadata_df = create_metadata_json_for_netcdf_files(
        rf=rf,
        debug=debug,
    )

    # Upload to GCP BigQuery
    pandas_gbq.to_gbq(
        dataframe=metadata_df,
        destination_table="metadata.aalibrary_netcdf_metadata",
        project_id=config.get_current_gcp_project_id(),
        if_exists="append",
    )

    return

create_and_upload_metadata_df_for_raw(rf=None, debug=False)

Creates a metadata file with appropriate information. Then uploads it to the correct table in GCP. Used for .raw files.

Parameters:

Name Type Description Default
rf RawFile

The RawFile object associated with this file. Defaults to None.

None
debug bool

Whether or not to print debug statements. Defaults to False.

False
Source code in src\aalibrary\metadata.py
def create_and_upload_metadata_df_for_raw(
    rf: RawFile = None,
    debug: bool = False,
):
    """Creates a metadata file with appropriate information. Then uploads it
    to the correct table in GCP. Used for .raw files.

    Args:
        rf (RawFile, optional): The RawFile object associated with this file.
            Defaults to None.
        debug (bool, optional): Whether or not to print debug statements.
            Defaults to False.
    """

    # Create the metadata file to be uploaded.
    metadata_df = create_metadata_json_for_raw_files(
        rf=rf,
        debug=debug,
    )

    # Upload to GCP BigQuery
    pandas_gbq.to_gbq(
        dataframe=metadata_df,
        destination_table="metadata.aalibrary_file_metadata",
        project_id=config.get_current_gcp_project_id(),
        if_exists="append",
    )

    return

create_metadata_json_for_netcdf_files(rf=None, debug=False)

Creates a JSON object containing metadata for the current user.

Parameters:

Name Type Description Default
rf RawFile

The RawFile object associated with this file. Defaults to None.

None
debug bool

Whether or not to print out the metadata json. Defaults to False.

False

Returns:

Type Description
DataFrame

pd.DataFrame: The metadata dataframe for the aalibrary_file_metadata database table.

Source code in src\aalibrary\metadata.py
def create_metadata_json_for_netcdf_files(
    rf: RawFile = None,
    debug: bool = False,
) -> pd.DataFrame:
    """Creates a JSON object containing metadata for the current user.

    Args:
        rf (RawFile, optional): The RawFile object associated with this file.
            Defaults to None.
        debug (bool, optional): Whether or not to print out the metadata json.
            Defaults to False.

    Returns:
        pd.DataFrame: The metadata dataframe for the `aalibrary_file_metadata`
            database table.
    """

    # Get the current user's email
    email = get_current_gcp_user_email()

    # get the survey datetime.
    file_datetime = datetime.strptime(
        rf.get_file_datetime_str(), "%Y-%m-%d %H:%M:%S"
    )

    # calculate the deletion datetime
    curr_datetime = datetime.now()
    deletion_datetime = curr_datetime + timedelta(days=90)
    deletion_datetime = deletion_datetime.strftime("%Y-%m-%d %H:%M:%S")

    metadata_json = {
        "FILE_NAME": rf.netcdf_file_name,
        "DATE_CREATED": datetime.now(timezone.utc).strftime(
            "%Y-%m-%d %H:%M:%S"
        ),
        "UPLOADED_BY": email,
        "ECHOPYPE_VERSION": echopype.__version__,
        "PYTHON_VERSION": sys.version.split(" ")[0],
        "NUMPY_VERSION": np.version.version,
        # maybe just add in echopype's reqs.
        # pip lock file - for current environment
        "NCEI_CRUISE_ID": rf.survey_name,
        "GCP_URI": rf.netcdf_gcp_storage_bucket_location,
        "FILE_DATETIME": file_datetime,
        "DELETION_DATETIME": deletion_datetime,
        "ICES_CODE": rf.ices_code,
    }

    aalibrary_metadata_df = pd.json_normalize(metadata_json)
    # make sure data types are conserved before upload to BigQuery.
    aalibrary_metadata_df["DATE_CREATED"] = pd.to_datetime(
        aalibrary_metadata_df["DATE_CREATED"], format="%Y-%m-%d %H:%M:%S"
    )
    aalibrary_metadata_df["FILE_DATETIME"] = pd.to_datetime(
        aalibrary_metadata_df["FILE_DATETIME"], format="%Y-%m-%d %H:%M:%S"
    )
    aalibrary_metadata_df["DELETION_DATETIME"] = pd.to_datetime(
        aalibrary_metadata_df["DELETION_DATETIME"], format="%Y-%m-%d %H:%M:%S"
    )

    if debug:
        print(aalibrary_metadata_df)
        logging.debug(aalibrary_metadata_df)

    return aalibrary_metadata_df

create_metadata_json_for_raw_files(rf=None, debug=False)

Creates a JSON object containing metadata for the current user.

Parameters:

Name Type Description Default
rf RawFile

The RawFile object associated with this file. Defaults to None.

None
debug bool

Whether or not to print out the metadata json. Defaults to False.

False

Returns:

Type Description
DataFrame

pd.DataFrame: The metadata dataframe for the aalibrary_file_metadata database table.

Source code in src\aalibrary\metadata.py
def create_metadata_json_for_raw_files(
    rf: RawFile = None,
    debug: bool = False,
) -> pd.DataFrame:
    """Creates a JSON object containing metadata for the current user.

    Args:
        rf (RawFile, optional): The RawFile object associated with this file.
            Defaults to None.
        debug (bool, optional): Whether or not to print out the metadata json.
            Defaults to False.

    Returns:
        pd.DataFrame: The metadata dataframe for the `aalibrary_file_metadata`
            database table.
    """
    # Get the current user's email
    email = get_current_gcp_user_email()

    # get the survey datetime.
    file_datetime = datetime.strptime(
        rf.get_file_datetime_str(), "%Y-%m-%d %H:%M:%S"
    )

    # calculate the deletion datetime
    curr_datetime = datetime.now()
    deletion_datetime = curr_datetime + timedelta(days=90)
    deletion_datetime = deletion_datetime.strftime("%Y-%m-%d %H:%M:%S")

    metadata_json = {
        "FILE_NAME": rf.raw_file_name,
        "DATE_CREATED": datetime.now(timezone.utc).strftime(
            "%Y-%m-%d %H:%M:%S"
        ),
        "UPLOADED_BY": email,
        "AALIBRARY_VERSION": aalibrary_version,
        "ECHOPYPE_VERSION": echopype.__version__,
        "PYTHON_VERSION": sys.version.split(" ")[0],
        "NUMPY_VERSION": np.version.version,
        # maybe just add in echopype's reqs.
        # pip lock file - for current environment
        "NCEI_CRUISE_ID": rf.survey_name,
        "NCEI_URI": rf.raw_file_s3_object_key,
        "GCP_BUCKET_NAME": rf.gcp_bucket_name,
        "GCP_URI": rf.raw_gcp_storage_bucket_location,
        "FILE_DATETIME": file_datetime,
        "FILE_DATETIME_TIMEZONE": None, # TODO: add timezone info if possible in rf object.
        "DELETION_DATETIME": deletion_datetime,
        "ICES_CODE": rf.ices_code,
        "SHIP_NAME": rf.ship_name,
        "SURVEY_NAME": rf.survey_name,
        "ECHOSOUNDER": rf.echosounder,
        "EXISTS_IN_NCEI": rf.raw_file_exists_in_ncei,
        "EXISTS_IN_GCP": rf.raw_file_exists_in_gcp,
        "EXISTS_IN_OMAO": rf.raw_file_exists_in_omao,
        "IDX_FILE_NCEI_URI": rf.idx_file_s3_object_key,
        "IDX_FILE_GCP_URI": rf.idx_gcp_storage_bucket_location,
        "IDX_FILE_OMAO_URI": rf.idx_omao_file_path,
        "IDX_FILE_EXISTS_IN_NCEI": rf.idx_file_exists_in_ncei,
        "IDX_FILE_EXISTS_IN_GCP": rf.idx_file_exists_in_gcp,
        "IDX_FILE_EXISTS_IN_OMAO": rf.idx_file_exists_in_omao,
        "BOT_FILE_NCEI_URI": rf.bot_file_s3_object_key,
        "BOT_FILE_GCP_URI": rf.bot_gcp_storage_bucket_location,
        "BOT_FILE_OMAO_URI": rf.bot_omao_file_path,
        "BOT_FILE_EXISTS_IN_NCEI": rf.bot_file_exists_in_ncei,
        "BOT_FILE_EXISTS_IN_GCP": rf.bot_file_exists_in_gcp,
        "BOT_FILE_EXISTS_IN_OMAO": rf.bot_file_exists_in_omao,
        "METADATA_JSON_FILE_GCP_URI": None, # TODO: add this info if possible in rf object.
        "METADATA_JSON_FILE_NCEI_URI": None, # TODO: add this info if possible in rf object.
        "METADATA_JSON_FILE_EXISTS_IN_GCP": None, # TODO: add this info if possible in rf object.
        "METADATA_JSON_FILE_EXISTS_IN_NCEI": None, # TODO: add this info if possible in rf object.
        "TUGBOAT_METADATA_JSON_SUBMISSION_STATUS": None, # TODO: add this info if possible in rf object.
        "FM_FILE_TYPE": None, # TODO: add this info if possible in rf object.
        "TRANSECT": None, # TODO: add this info if possible in rf object.
    }

    aalibrary_metadata_df = pd.json_normalize(metadata_json)
    # make sure data types are conserved before upload to BigQuery.
    aalibrary_metadata_df["DATE_CREATED"] = pd.to_datetime(
        aalibrary_metadata_df["DATE_CREATED"], format="%Y-%m-%d %H:%M:%S"
    )
    aalibrary_metadata_df["FILE_DATETIME"] = pd.to_datetime(
        aalibrary_metadata_df["FILE_DATETIME"], format="%Y-%m-%d %H:%M:%S"
    )
    aalibrary_metadata_df["DELETION_DATETIME"] = pd.to_datetime(
        aalibrary_metadata_df["DELETION_DATETIME"], format="%Y-%m-%d %H:%M:%S"
    )

    if debug:
        print(aalibrary_metadata_df)
        logging.debug(aalibrary_metadata_df)

    return aalibrary_metadata_df

delay_file_deletion(file_name='', days=0, gcp_project_id=config.get_current_gcp_project_id())

Delays a file's DELETION_DATETIME by the number of days specified.

Parameters:

Name Type Description Default
file_name str

The unique file name. Defaults to "".

''
days int

The number of days by which to delay the file' execution. Defaults to 0.

0
gcp_project_id str

The GCP project ID. Defaults to the current GCP project ID obtained through the config module.

get_current_gcp_project_id()
Source code in src\aalibrary\metadata.py
def delay_file_deletion(
    file_name: str = "",
    days: int = 0,
    gcp_project_id: str = config.get_current_gcp_project_id(),
):
    """Delays a file's DELETION_DATETIME by the number of days specified.

    Args:
        file_name (str, optional): The unique file name. Defaults to "".
        days (int, optional): The number of days by which to delay the file'
            execution. Defaults to 0.
        gcp_project_id (str, optional): The GCP project ID.
            Defaults to the current GCP project ID obtained through the config
            module.
    """
    # Get the file deletion datetime.
    file_deletion_datetime = get_deletion_datetime_of_file(
        file_name=file_name, gcp_project_id=gcp_project_id
    )
    # Extend it by the number of days specified.
    file_deletion_datetime = file_deletion_datetime + timedelta(days=days)

    query = f"""UPDATE `{gcp_project_id}.metadata.aalibrary_file_metadata`
    SET DELETION_DATETIME = CAST("{str(file_deletion_datetime)}" AS DATETIME)
    WHERE FILE_NAME = '{file_name}' """
    gcp_bq_client = bigquery.Client(location="US")
    try:
        job = gcp_bq_client.query(query)
        job.result()
        print(
            f"File DELETION_DATETIME delayed by {days} day(s) to"
            f" {file_deletion_datetime}"
        )
    except Exception as e:
        print(f"Could not update DELETION_DATETIME due to:\n{e}")
        return

get_current_gcp_user_email()

Gets the current gcloud user's email.

Returns:

Name Type Description
str str

A string containing the current gcloud user's email.

Source code in src\aalibrary\metadata.py
def get_current_gcp_user_email() -> str:
    """Gets the current gcloud user's email.

    Returns:
        str: A string containing the current gcloud user's email.
    """

    # Gets the current gcloud user's email
    get_curr_user_email_cmd = ["gcloud", "config", "get-value", "account"]
    if platform.system() == "Windows":
        email = subprocess.run(
            get_curr_user_email_cmd,
            shell=True,
            capture_output=True,
            text=True,
            check=False,
        ).stdout
    else:
        email = subprocess.run(
            get_curr_user_email_cmd,
            capture_output=True,
            text=True,
            check=False,
        ).stdout
    email = email.replace("\n", "")
    return email

get_deletion_datetime_of_file(file_name='', gcp_project_id=config.get_current_gcp_project_id())

Gets the DELETION_DATETIME of a file. Returns a datetime object.

Parameters:

Name Type Description Default
file_name str

The file name. Defaults to "".

''
gcp_project_id str

The GCP project ID. Defaults to the current GCP project ID obtained through the config module.

get_current_gcp_project_id()

Returns: datetime: The DELETION_DATETIME of the file as a datetime object.

Source code in src\aalibrary\metadata.py
def get_deletion_datetime_of_file(
    file_name: str = "",
    gcp_project_id: str = config.get_current_gcp_project_id(),
) -> datetime:
    """Gets the DELETION_DATETIME of a file. Returns a datetime object.

    Args:
        file_name (str, optional): The file name. Defaults to "".
        gcp_project_id (str, optional): The GCP project ID.
            Defaults to the current GCP project ID obtained through the config
            module.
    Returns:
        datetime: The DELETION_DATETIME of the file as a datetime object.
    """

    query = f"""SELECT DELETION_DATETIME
    FROM `{gcp_project_id}.metadata.aalibrary_file_metadata`
    WHERE FILE_NAME = '{file_name}'"""
    gcp_bq_client = bigquery.Client(location="US")
    job = gcp_bq_client.query(query)
    file_deletion_datetime = (
        job.result().to_dataframe()["DELETION_DATETIME"].tolist()[0]
    )
    file_deletion_datetime = str(file_deletion_datetime)
    file_deletion_datetime = datetime.strptime(
        file_deletion_datetime, "%Y-%m-%d %H:%M:%S"
    )
    return file_deletion_datetime

get_metadata_in_df_format()

Retrieves the metadata associated with all objects in GCP in DataFrame format.

Source code in src\aalibrary\metadata.py
def get_metadata_in_df_format():
    """Retrieves the metadata associated with all objects in GCP in DataFrame
    format."""

upload_ncei_metadata_df_to_bigquery(ship_name='', survey_name='', download_location='', s3_bucket=None)

Finds the metadata obtained from a survey on NCEI, and uploads it to the ncei_cruise_metadata database table in bigquery. Also handles for extra database entries that are needed, such as uploading to the ncei_instrument_metadata when necessary.

Parameters:

Name Type Description Default
ship_name str

The ship name associated with this survey. Defaults to "".

''
survey_name str

The survey name/identifier. Defaults to "".

''
download_location str

The local download location for the file. Defaults to "".

''
s3_bucket resource

The bucket resource object. Defaults to None.

None
Source code in src\aalibrary\metadata.py
def upload_ncei_metadata_df_to_bigquery(
    ship_name: str = "",
    survey_name: str = "",
    download_location: str = "",
    s3_bucket: boto3.resource = None,
):
    """Finds the metadata obtained from a survey on NCEI, and uploads it to the
    `ncei_cruise_metadata` database table in bigquery. Also handles for extra
    database entries that are needed, such as uploading to the
    `ncei_instrument_metadata` when necessary.

    Args:
        ship_name (str, optional): The ship name associated with this survey.
            Defaults to "".
        survey_name (str, optional): The survey name/identifier.
            Defaults to "".
        download_location (str, optional): The local download location for the
            file. Defaults to "".
        s3_bucket (boto3.resource, optional): The bucket resource object.
            Defaults to None.
    """

    # This var can either be a string with the file's location, or None.
    metadata_file_exists = check_if_tugboat_metadata_json_exists_in_survey(
        ship_name=ship_name, survey_name=survey_name, s3_bucket=s3_bucket
    )

    if metadata_file_exists:
        # TODO: Download all metadata files to local for download? Even
        # calibration files?
        # Handle for main metadata file for upload to BigQuery.
        s3_bucket.download_file(metadata_file_exists, download_location)
        # Subroutine to parse this file and upload to gcp.
        _parse_and_upload_ncei_survey_level_metadata(
            survey_name=survey_name, file_location=download_location
        )

queries

This script contains classes that have SQL queries used for interaction with the metadata database in BigQuery.

Classes:

Name Description
MetadataQueries

This class contains queries related to the upload, alteration, and

MetadataQueries dataclass

This class contains queries related to the upload, alteration, and retrieval of metadata from our BigQuery instance.

Source code in src\aalibrary\queries.py
@dataclass
class MetadataQueries:
    """This class contains queries related to the upload, alteration, and
    retrieval of metadata from our BigQuery instance.
    """

    get_all_aalibrary_metadata_records: str = """
    SELECT * FROM `ggn-nmfs-aa-dev-1.metadata.aalibrary_file_metadata`"""

    # TODO for mike ryan
    get_all_possible_ship_names_from_database: str = """
    SELECT ship_name from `ggn-nmfs-aa-dev-1.metadata.aalibrary_file_metadata`
    """

    def get_all_surveys_associated_with_a_ship_name(self, ship_name: str = ""):
        get_all_surveys_associated_with_a_ship_name_query: str = """"""
        return get_all_surveys_associated_with_a_ship_name_query

    def get_all_echosounders_used_in_a_survey(self, survey: str = ""): ...

    def get_all_netcdf_files_in_database(self): ...

quick_test

Contains quick tests for the API to verify that the connections are working as intended. Also checks to see if a raw file can be downloaded.

Functions:

Name Description
init_test_folder

Creates a test folder in the current directory for quick tests and

start

Runs quick tests to verify that the connections are working as intended

init_test_folder(test_folder_name='test_data_dir')

Creates a test folder in the current directory for quick tests and downloads test files.

Parameters:

Name Type Description Default
test_folder_name str

The name of the folder you want to download test files into. Defaults to "test_data_dir".

'test_data_dir'
Source code in src\aalibrary\quick_test.py
def init_test_folder(test_folder_name: str = "test_data_dir"):
    """Creates a test folder in the current directory for quick tests and
    downloads test files.

    Args:
        test_folder_name (str, optional): The name of the folder you want to
            download test files into. Defaults to "test_data_dir".
    """

    test_folder_name = "test_data_dir"
    current_dir = os.getcwd()
    test_folder_directory = os.path.join(current_dir, test_folder_name)
    # Normalize the path to ensure it ends with a separator
    test_folder_directory = os.path.normpath(test_folder_directory) + os.sep
    # Create test folder
    print(
        f"Creating test folder '{test_folder_name}' in"
        f" `{test_folder_directory}`..."
    )
    os.makedirs(f"{test_folder_directory}", exist_ok=True)
    print(
        f"Test folder '{test_folder_name}' created successfully in"
        f" `{test_folder_directory}`."
    )

    print("Downloading test files...")
    ingestion.download_raw_file_from_ncei(
        file_name="2107RL_FM-D20210804-T214458.raw",
        file_type="raw",
        ship_name="Reuben_Lasker",
        survey_name="RL2107",
        echosounder="EK80",
        file_download_directory=test_folder_directory,
        upload_to_gcp=False,
        debug=False,
    )
    ingestion.download_raw_file_from_ncei(
        file_name="2107RL_FM-D20210808-T033245.raw",
        file_type="raw",
        ship_name="Reuben_Lasker",
        survey_name="RL2107",
        echosounder="EK80",
        file_download_directory=test_folder_directory,
        upload_to_gcp=False,
        debug=False,
    )
    ingestion.download_raw_file_from_ncei(
        file_name="2107RL_FM-D20211012-T022341.raw",
        file_type="raw",
        ship_name="Reuben_Lasker",
        survey_name="RL2107",
        echosounder="EK80",
        file_download_directory=test_folder_directory,
        upload_to_gcp=False,
        debug=False,
    )
    ingestion.download_raw_file_from_ncei(
        file_name="2107RL_CW-D20211001-T132449.raw",
        file_type="raw",
        ship_name="Reuben_Lasker",
        survey_name="RL2107",
        echosounder="EK80",
        upload_to_gcp=False,
        file_download_directory=test_folder_directory,
        debug=False,
    )
    aalibrary.utils.ncei_utils.download_single_file_from_aws(
        file_url=(
            "data/raw/Reuben_L"
            "asker/RL2107/metadata/RL2107_EK80_WCSD_EK80-metadata.json"
        ),
        download_location=os.path.join(
            test_folder_directory, "RL2107_EK80_WCSD_EK80-metadata.json"
        ),
    )
    print("Test files downloaded successfully.")

start()

Runs quick tests to verify that the connections are working as intended and checks to see if a raw file can be downloaded. Can also be used to set up the test folder by calling init_test_folder().

Source code in src\aalibrary\quick_test.py
def start():
    """Runs quick tests to verify that the connections are working as intended
    and checks to see if a raw file can be downloaded.
    Can also be used to set up the test folder by calling `init_test_folder()`.
    """

    # `gcloud` setup test
    try:
        print("`gcloud` SETUP TEST...", end="")
        curr_user_email = metadata.get_current_gcp_user_email()
        assert (
            curr_user_email != ""
        ), "Please login to `gcloud` using `gcloud auth login --no-browser`"
        assert echopype.__version__ != "", (
            "Please install requirements using `pip install -r src/aalibrary/"
            "requirements.txt`, or you can try reinstalling `aalibrary` to "
            "automatically take care of dependencies."
        )
        print("PASSED")
    except Exception as e:
        print(
            f"`gcloud` SETUP TEST FAILED DUE TO THE FOLLOWING ERROR:\n{e}",
            file=sys.stderr,
        )

    # CONNECTION TEST: set up storage objects
    try:
        print("GCP CONNECTION TEST...", end="")
        _, _, _ = cloud_utils.setup_gcp_storage_objs()
        print("PASSED")
    except Exception as e:
        print(
            f"CONNECTION TEST TO GCP FAILED DUE TO THE FOLLOWING ERROR:\n{e}",
            file=sys.stderr,
        )

    try:
        print("S3 CONNECTION TEST...", end="")
        _, _, _ = cloud_utils.create_s3_objs()
        print("PASSED")
    except Exception as e:
        print(
            f"CONNECTION TEST TO s3 FAILED DUE TO THE FOLLOWING ERROR:\n{e}",
            file=sys.stderr,
        )

    # FUNCTIONAL TEST: download a raw file
    file_name = "2107RL_CW-D20210916-T165047.raw"
    ship_name = "Reuben_Lasker"
    survey_name = "RL2107"
    echosounder = "EK80"
    data_source = "TEST"
    file_download_directory = "."
    file_download_location = "./" + file_name
    idx_file_download_location = file_download_location.replace(".raw", ".idx")

    try:
        print("NCEI DOWNLOAD TEST...", end="")
        ingestion.download_raw_file(
            file_name=file_name,
            ship_name=ship_name,
            survey_name=survey_name,
            echosounder=echosounder,
            data_source=data_source,
            file_download_directory=file_download_directory,
            debug=False,
        )
        print("PASSED")
        os.remove(file_download_location)
        os.remove(idx_file_download_location)
    except Exception as e:
        print(
            f"NCEI DOWNLOAD TEST FAILED DUE TO THE FOLLOWING ERROR:\n{e}",
            file=sys.stderr,
        )