class RawFile:
"""A class used to represent a raw file, from given parameters.
Args:
file_name (str): The name of the file, including the extension.
file_type (str): The type of the file (ex. "raw", "idx", "bot",
"netcdf").
ship_name (str): The name of the ship, will get normalized.
survey_name (str): The name of the survey.
echosounder (str): The name of the echosounder.
data_source (str): The name of the data source.
file_download_directory (str): The directory you want to download the
file to.
is_metadata (bool): Whether the file is a metadata file or not,
defaults to False.
upload_to_gcp (bool): Whether to upload the file to gcp after
downloading it, defaults to False.
debug (bool): Whether to print debug messages or not, defaults to
False.
gcp_project_id (str): The gcp project id to use for uploading the file
to gcp, defaults to None (will use environment variable).
gcp_bucket_name (str): The gcp bucket name to use for uploading the
file to gcp, defaults to None (will use environment variable).
gcp_bucket (storage.Client.bucket): The gcp bucket object to use for
uploading the file to gcp, defaults to None (will create one using
gcp_bucket_name).
s3_resource (boto3.resource): The s3 resource object to use for
checking if the file exists in s3, defaults to None (will create
one).
"""
file_name: str = None
file_type: str = None
ship_name: str = None
survey_name: str = None
echosounder: str = None
data_source: str = None
file_download_directory: str = ""
is_metadata: bool = False
upload_to_gcp: bool = False
debug: bool = False
gcp_project_id: str = os.getenv("AALIBRARY_GCP_PROJECT_ID")
gcp_bucket_name: str = os.getenv("AALIBRARY_GCP_BUCKET_NAME")
gcp_bucket: storage.Client.bucket = None
s3_resource: boto3.resource = None
datagram_dict: dict = {}
# Get all valid and normalized ICES ship names
valid_ICES_ship_names = ices_ship_names.get_all_ices_ship_names(
normalize_ship_names=True
)
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
self._handle_paths()
self._create_vars_for_use_later()
self._create_cruise_level_metadata_vars()
self._get_datagram_data()
self._create_download_directories_if_not_exists()
self._check_for_assertion_errors()
def __repr__(self):
return pprint.pformat(self.__dict__, indent=4)
def __str__(self):
return pprint.pformat(self.__dict__, indent=4)
def _handle_paths(self):
"""Handles all minute functions and adjustments related to paths."""
# Normalize paths
if "file_download_directory" in self.__dict__:
self.file_download_directory = (
os.path.normpath(self.file_download_directory) + os.sep
)
if self.debug:
logging.debug(
"normalized file download directory = %s",
self.file_download_directory,
)
# Take care of an empty file_download_directory and treat it like the
# cwd.
if (
(self.__dict__["file_download_directory"] == "")
or ("file_download_directory" not in self.__dict__)
or (self.__dict__["file_download_directory"] == "./")
or (self.__dict__["file_download_directory"] == ".\\")
):
self.file_download_directory = os.path.normpath(
os.getcwd() + os.sep
)
if self.debug:
logging.debug(
"converted file_download_directory to directory %s",
self.file_download_directory,
)
def _create_download_directories_if_not_exists(self):
"""Create the download directory (path) if it doesn't exist."""
if "file_download_directory" in self.__dict__:
if not os.path.exists(self.file_download_directory):
os.makedirs(self.file_download_directory)
def _create_vars_for_use_later(self):
"""Creates vars that will add value and can be utilized later."""
# Handle undefined GCP project id and bucket name by using environment
# variables.
if self.gcp_project_id is None:
self.gcp_project_id = os.getenv("AALIBRARY_GCP_PROJECT_ID")
if self.gcp_bucket_name is None:
self.gcp_bucket_name = os.getenv("AALIBRARY_GCP_BUCKET_NAME")
datetime_dict = get_parsed_datetime_from_filename(
file_name=self.file_name, return_as_dict=True
)
if datetime_dict is not None:
self.year_str = datetime_dict["year"]
self.month_str = datetime_dict["month"]
self.date_str = datetime_dict["date"]
self.year = int(self.year_str)
self.month = int(self.month_str)
self.date = int(self.date_str)
self.hour_str = datetime_dict["hour"]
self.minute_str = datetime_dict["minute"]
self.second_str = datetime_dict["second"]
self.hour = int(self.hour_str)
self.minute = int(self.minute_str)
self.second = int(self.second_str)
# Normalize ship name
if "ship_name" in self.__dict__:
self.ship_name_unnormalized = self.ship_name
self.ship_name = utils.helpers.normalize_ship_name(self.ship_name)
# Get the NCEI formatted name if the data source is NCEI.
# This is basically a spell checker for NCEI.
if self.data_source == "NCEI":
self.ship_name_unnormalized = (
utils.ncei_utils.get_closest_ncei_formatted_ship_name(
ship_name=self.ship_name
)
)
# If the ship name exists in ICES, get the ICES code for it.
if self.ship_name in self.valid_ICES_ship_names:
self.ices_code = ices_ship_names.get_ices_code_from_ship_name(
ship_name=self.ship_name, is_normalized=True
)
else:
self.ices_code = ""
# Create connection objects if they dont exist
self.s3_bucket_name = "noaa-wcsd-pds"
if (
("gcp_bucket" not in self.__dict__)
or ("gcp_bucket_name" not in self.__dict__)
or ("gcp_stor_client" not in self.__dict__)
):
self.gcp_stor_client, self.gcp_bucket_name, self.gcp_bucket = (
utils.cloud_utils.setup_gcp_storage_objs(
project_id=self.gcp_project_id,
gcp_bucket_name=self.gcp_bucket_name,
)
)
if (
("s3_resource" not in self.__dict__)
or ("s3_client" not in self.__dict__)
or ("s3_bucket" not in self.__dict__)
):
self.s3_client, self.s3_resource, self.s3_bucket = (
utils.cloud_utils.create_s3_objs()
)
# Create file names for all other files that can exist
self.raw_file_name = self.file_name
self.file_name_wo_extension = self.file_name.split(".")[0]
self.idx_file_name = ".".join(self.file_name.split(".")[:-1]) + ".idx"
self.bot_file_name = ".".join(self.file_name.split(".")[:-1]) + ".bot"
self.netcdf_file_name = (
".".join(self.file_name.split(".")[:-1]) + ".nc"
)
# Create download paths for all four types of files
self.raw_file_download_path = os.path.normpath(
os.sep.join([self.file_download_directory, self.file_name])
)
self.idx_file_download_path = os.path.normpath(
os.sep.join([self.file_download_directory, self.idx_file_name])
)
self.bot_file_download_path = os.path.normpath(
os.sep.join([self.file_download_directory, self.bot_file_name])
)
self.netcdf_file_download_path = os.path.normpath(
os.sep.join([self.file_download_directory, self.netcdf_file_name])
)
# Create all possible NCEI urls that can exist
# We have to use the un-normalized version of the ship name since
# NCEI does not normalize it.
self.raw_file_ncei_url = utils.helpers.create_ncei_url_from_variables(
file_name=self.raw_file_name,
ship_name=self.ship_name_unnormalized,
survey_name=self.survey_name,
echosounder=self.echosounder,
)
self.idx_file_ncei_url = utils.helpers.create_ncei_url_from_variables(
file_name=self.idx_file_name,
ship_name=self.ship_name_unnormalized,
survey_name=self.survey_name,
echosounder=self.echosounder,
)
self.bot_file_ncei_url = utils.helpers.create_ncei_url_from_variables(
file_name=self.bot_file_name,
ship_name=self.ship_name_unnormalized,
survey_name=self.survey_name,
echosounder=self.echosounder,
)
# NCEI does not store netcdf files, so we will not be creating a url
# for them.
# Create all GCP Storage bucket locations for each possible file
self.raw_gcp_storage_bucket_location = (
utils.helpers.parse_correct_gcp_storage_bucket_location(
file_name=self.raw_file_name,
file_type="raw",
ship_name=self.ship_name,
survey_name=self.survey_name,
echosounder=self.echosounder,
data_source=self.data_source,
debug=self.debug,
)
)
self.idx_gcp_storage_bucket_location = (
utils.helpers.parse_correct_gcp_storage_bucket_location(
file_name=self.idx_file_name,
file_type="idx",
ship_name=self.ship_name,
survey_name=self.survey_name,
echosounder=self.echosounder,
data_source=self.data_source,
debug=self.debug,
)
)
self.bot_gcp_storage_bucket_location = (
utils.helpers.parse_correct_gcp_storage_bucket_location(
file_name=self.bot_file_name,
file_type="bot",
ship_name=self.ship_name,
survey_name=self.survey_name,
echosounder=self.echosounder,
data_source=self.data_source,
debug=self.debug,
)
)
self.netcdf_gcp_storage_bucket_location = (
utils.helpers.parse_correct_gcp_storage_bucket_location(
file_name=self.netcdf_file_name,
file_type="netcdf",
ship_name=self.ship_name,
survey_name=self.survey_name,
echosounder=self.echosounder,
data_source=self.data_source,
debug=self.debug,
)
)
# Create all OMAO storage locations for each file.
self.raw_omao_file_path = (
utils.helpers.create_omao_file_path_from_variables(
self.raw_file_name,
ship_name=self.ship_name,
survey_name=self.survey_name,
echosounder=self.echosounder,
)
)
self.idx_omao_file_path = (
utils.helpers.create_omao_file_path_from_variables(
file_name=self.idx_file_name,
ship_name=self.ship_name,
survey_name=self.survey_name,
echosounder=self.echosounder,
)
)
self.bot_omao_file_path = (
utils.helpers.create_omao_file_path_from_variables(
file_name=self.bot_file_name,
ship_name=self.ship_name,
survey_name=self.survey_name,
echosounder=self.echosounder,
)
)
self.netcdf_omao_file_path = (
utils.helpers.create_omao_file_path_from_variables(
file_name=self.netcdf_file_name,
ship_name=self.ship_name,
survey_name=self.survey_name,
echosounder=self.echosounder,
)
)
# Create object keys for NCEI
self.raw_file_s3_object_key = utils.cloud_utils.get_object_key_for_s3(
file_name=self.raw_file_name,
ship_name=self.ship_name_unnormalized,
survey_name=self.survey_name,
echosounder=self.echosounder,
)
self.idx_file_s3_object_key = utils.cloud_utils.get_object_key_for_s3(
file_name=self.idx_file_name,
ship_name=self.ship_name_unnormalized,
survey_name=self.survey_name,
echosounder=self.echosounder,
)
self.bot_file_s3_object_key = utils.cloud_utils.get_object_key_for_s3(
file_name=self.bot_file_name,
ship_name=self.ship_name_unnormalized,
survey_name=self.survey_name,
echosounder=self.echosounder,
)
# NCEI does not have netcdf files, so we will not create object keys.
# Check if the file(s) exist in NCEI
self.raw_file_exists_in_ncei = (
utils.cloud_utils.check_if_file_exists_in_s3(
object_key=self.raw_file_s3_object_key,
s3_resource=self.s3_resource,
s3_bucket_name=self.s3_bucket_name,
)
)
self.idx_file_exists_in_ncei = (
utils.cloud_utils.check_if_file_exists_in_s3(
object_key=self.idx_file_s3_object_key,
s3_resource=self.s3_resource,
s3_bucket_name=self.s3_bucket_name,
)
)
self.bot_file_exists_in_ncei = (
utils.cloud_utils.check_if_file_exists_in_s3(
object_key=self.bot_file_s3_object_key,
s3_resource=self.s3_resource,
s3_bucket_name=self.s3_bucket_name,
)
)
# NCEI does not store netcdf files, so we will not be checking.
# Check if the file(s) exist in GCP
self.raw_file_exists_in_gcp = (
utils.cloud_utils.check_if_file_exists_in_gcp(
bucket=self.gcp_bucket,
file_path=self.raw_gcp_storage_bucket_location,
)
)
self.idx_file_exists_in_gcp = (
utils.cloud_utils.check_if_file_exists_in_gcp(
bucket=self.gcp_bucket,
file_path=self.idx_gcp_storage_bucket_location,
)
)
self.bot_file_exists_in_gcp = (
utils.cloud_utils.check_if_file_exists_in_gcp(
bucket=self.gcp_bucket,
file_path=self.bot_gcp_storage_bucket_location,
)
)
self.netcdf_file_exists_in_gcp = (
utils.cloud_utils.check_if_file_exists_in_gcp(
bucket=self.gcp_bucket,
file_path=self.netcdf_gcp_storage_bucket_location,
)
)
# Vars for omao data lake existence.
# TODO: implement after we have access to the OMAO data lake and can
# figure out how to check for files in it.
self.raw_file_exists_in_omao = False
self.idx_file_exists_in_omao = False
self.bot_file_exists_in_omao = False
self.netcdf_file_exists_in_omao = False
def _create_cruise_level_metadata_vars(self):
"""Creates cruise level metadata variables that we can use to access
cruise level metadata files."""
self.cruise_level_metadata_json_file_ncei_uri = (
check_if_tugboat_metadata_json_exists_in_survey(
ship_name=self.ship_name_unnormalized,
survey_name=self.survey_name,
s3_bucket=self.s3_resource,
)
)
if self.cruise_level_metadata_json_file_ncei_uri is not None:
self.cruise_level_metadata_file_name = (
self.cruise_level_metadata_json_file_ncei_uri.split("/")[-1]
)
self.cruise_level_metadata_json_file_exists_in_ncei = True
self.cruise_level_metadata_file_gcp_uri = (
utils.helpers.parse_correct_gcp_storage_bucket_location(
file_name=self.cruise_level_metadata_file_name,
file_type="json",
ship_name=self.ship_name,
survey_name=self.survey_name,
echosounder=self.echosounder,
data_source=self.data_source,
is_survey_metadata=True,
debug=self.debug,
)
)
self.cruise_level_metadata_exists_in_gcp = (
utils.cloud_utils.check_if_file_exists_in_gcp(
bucket=self.gcp_bucket,
file_path=self.cruise_level_metadata_file_gcp_uri,
)
)
else:
self.cruise_level_metadata_file_name = None
self.cruise_level_metadata_json_file_exists_in_ncei = False
self.cruise_level_metadata_file_gcp_uri = None
self.cruise_level_metadata_exists_in_gcp = False
def _get_datagram_data(self):
"""Gets datagram data from the raw file and adds it to the RawFile
object as a dictionary. If the file is from NCEI, it will stream the
datagram data from S3. If the file is not from NCEI, it will read the
datagram data from the raw file directly. The datagram data will be
stored in the `datagram_dict` attribute of the RawFile object."""
if self.data_source == "NCEI" and self.raw_file_exists_in_ncei:
self.datagram_dict = stream_datagram_dict_from_ncei(
s3_object_key=self.raw_file_s3_object_key
)
# Assign the relevant metadata from the datagram to the RawFile
# object as attributes if they exist in the datagram.
if "timestamp" in self.datagram_dict:
self.datagram_timestamp = self.datagram_dict["timestamp"]
self.file_datetime_timezone = str(
self.datagram_timestamp.tzinfo
)
else:
self.datagram_timestamp = None
self.file_datetime_timezone = None
if "transect_name" in self.datagram_dict:
self.transect_name = self.datagram_dict["transect_name"]
else:
self.transect_name = None
def _check_for_assertion_errors(self):
"""Checks for errors in each variable in our self.__dict__."""
if "file_name" in self.__dict__:
assert self.file_name != "", (
"Please provide a valid file name with the file extension"
" (ex. `2107RL_CW-D20210813-T220732.raw`)"
)
if "file_type" in self.__dict__:
assert self.file_type != "", "Please provide a valid file type."
assert self.file_type in config.VALID_FILETYPES, (
"Please provide a valid file type (extension) "
f"from the following: {config.VALID_FILETYPES}"
)
if "ship_name" in self.__dict__:
assert self.ship_name != "", (
"Please provide a valid ship name "
"(Title_Case_With_Underscores_As_Spaces)."
)
assert " " not in self.ship_name, (
"Please provide a valid ship name "
"(Title_Case_With_Underscores_As_Spaces)."
)
# Check for spell check using custom list
spell_check_list = get_close_matches(
self.ship_name, self.valid_ICES_ship_names, n=3, cutoff=0.6
)
if len(spell_check_list) > 0:
assert self.ship_name in self.valid_ICES_ship_names, (
f"This `ship_name` {self.ship_name} does not"
" exist in the ICES database. Did you mean one of the"
f" following?\n{spell_check_list}"
)
else:
assert self.ship_name in self.valid_ICES_ship_names, (
f"This `ship_name` {self.ship_name} does not"
" exist in the ICES database."
)
if "survey_name" in self.__dict__:
assert (
self.survey_name != ""
), "Please provide a valid survey name."
if "echosounder" in self.__dict__:
assert (
self.echosounder != ""
), "Please provide a valid echosounder."
assert self.echosounder in config.VALID_ECHOSOUNDERS, (
"Please provide a valid echosounder from the "
f"following: {config.VALID_ECHOSOUNDERS}"
)
if "data_source" in self.__dict__:
assert self.data_source != "", (
"Please provide a valid data source from the "
f"following: {config.VALID_DATA_SOURCES}"
)
assert self.data_source in config.VALID_DATA_SOURCES, (
"Please provide a valid data source from the "
f"following: {config.VALID_DATA_SOURCES}"
)
if "file_download_directory" in self.__dict__:
assert (
self.file_download_directory != ""
), "Please provide a valid file download location (a directory)."
assert os.path.isdir(self.file_download_directory), (
f"File download location `{self.file_download_directory}` is"
" not found to be a valid dir, please reformat it."
)
if "gcp_bucket" in self.__dict__:
assert self.gcp_bucket is not None, (
"Please provide a gcp_bucket object with"
" `utils.cloud_utils.setup_gcp_storage()`"
)
# TODO:
def _raw_file_exists_in_azure_data_lake(self): ...
def _idx_file_exists_in_azure_data_lake(self): ...
def _bot_file_exists_in_azure_data_lake(self): ...
def _netcdf_file_exists_in_azure_data_lake(self): ...
def get_str_times(self) -> Union[dict, None]:
"""Gets the parsed times of the current file in dict format
Returns:
dict: An OrderedDict containing all of the data collection times
(based on name), for this file. If no times are found, returns
None.
"""
try:
temp_dict = OrderedDict(
[
("year", self.year_str),
("month", self.month_str),
("date", self.date_str),
("hour", self.hour_str),
("minute", self.minute_str),
("second", self.second_str),
]
)
return temp_dict
except AttributeError:
return None
def print_times(self) -> str:
"""Prints the parsed times of the current file in dict format.
Returns:
str: The pretty print version of a string of the current file's
data collection datetime (based on file name).
"""
try:
temp_dict = OrderedDict(
[
("year", self.year),
("month", self.month),
("date", self.date),
("hour", self.hour),
("minute", self.minute),
("second", self.second),
]
)
return pprint.pformat(temp_dict, indent=4)
except AttributeError:
return f"No datetime string found in file name {self.file_name}."
def get_file_datetime_str(self) -> str:
"""Gets the datetime as a datetime formatted string.
Format: "%Y-%m-%d %H:%M:%S"
Returns:
str: The datetime formatted string or empty string if not found.
"""
try:
datetime_str = (
f"{self.year_str}-{self.month_str}-{self.date_str} "
f"{self.hour_str}:{self.minute_str}:{self.second_str}"
)
return datetime_str
except AttributeError:
return ""