# -*- coding: utf-8 -*-
"""
Here, the ``scope`` library is described. This allows you to use specific parts
of ``scope`` from other programs.
``scope`` consists of several main classes. Note that most of them provide
Python access to ``cdo`` calls via Python's built-in ``subprocess`` module.
Without a correctly installed ``cdo``, many of these functions/classes will not
work.
Here, we provide a quick summary, but please look at the documentation for each
function and class for more complete information. The following functions are
defined:
* ``determine_cdo_openMP`` -- using ``cdo --version``, determines if you have
openMP support.
The following classes are defined here:
* ``Scope`` -- an abstract base class useful for starting other classes from.
This provides a way to determine if ``cdo`` has openMP support or not by
parsing ``cdo --version``. Additionally, it has a nested class which gives
you decorators to put around methods for enabling arbitrary shell calls
before and after the method is executed, which can be configured via the
``Scope.config`` dictionary.
* ``Preprocess`` -- a class to extract and combine various NetCDF files for
further processing.
* ``Regrid`` -- a class to easily regrid from one model to another, depending
on the specifications in the ``scope_config.yaml``
"""
from functools import wraps
import logging
import os
import re
import subprocess
import click
[docs]def determine_cdo_openMP():
"""
Checks if the ``cdo`` version being used supports ``OpenMP``; useful to
check if you need a ``-P`` flag or not.
Parameters
----------
None
Returns
-------
bool :
True if ``OpenMP`` is listed in the Features of ``cdo``, otherwise
False
"""
cmd = "cdo --version"
cdo_ver = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
cdo_ver = cdo_ver.decode("utf-8")
for line in cdo_ver.split("\n"):
if line.startswith("Features"):
return "OpenMP" in line
return False
[docs]class Scope(object):
def __init__(self, config, whos_turn):
"""
Base class for various Scope objects. Other classes should extend this one.
Parameters
----------
config : dict
A dictionary (normally recieved from a YAML file) describing the
``scope`` configuration. An example dictionary is included in the root
directory under ``examples/scope_config.yaml``
whos_turn : str
An explicit model name telling you which model is currently
interfacing with ``scope`` e.g. ``echam`` or ``pism``.
Warning
-------
This function has a filesystem side-effect: it generates the couple
folder defined in ``config["scope"]["couple_dir"]``. If you don't have
permissions to create this folder, the object initialization will
fail...
Some design features are listed below:
* **``pre`` and ``post`` hooks**
--------------------------------
Any appropriately decorated method of a ``scope`` object has a hook to
call a script with specific arguments and flags before and after the
main scope method call. Best explained by an example. Assume your Scope
subclass has a method "preprocess". Here is the order the program will
execute in, given the following configuration:
.. code :: yaml
pre_preprocess:
program: /some/path/to/an/executable
args:
- list
- of
- arguments
flags:
- "--flag value1"
- "--different_flag value2"
post_preprocess:
program: /some/other/path
args:
- A
- B
- C
flags:
- "--different_flag value3"
Given this configuration, an idealized system call would look like the
example shown below. Note however that the Python program calls the
shell and immediately destroys it again, so any variables exported to
the environment (probably) don't survive:
.. code :: shell
$ ./pre_preprocess['program'] list of arguments --flag value1 --different_flag value2
$ <... python call to preprocess method ...>
$ ./post_preprocess['program'] A B C --different_flag value 3
"""
self.config = config
self.whos_turn = whos_turn
if not os.path.isdir(config["scope"]["couple_dir"]):
os.makedirs(config["scope"]["couple_dir"])
[docs] def get_cdo_prefix(self, has_openMP=None):
"""
Return a string with an appropriate ``cdo`` prefix for using OpenMP
with the ``-P`` flag.
Parameters
----------
has_openMP : bool
Default is ``None``. You can explicitly override the ability of
``cdo`` to use the ``-P`` flag. If set to ``True``, the config must
have an entry under ``config[scope][number openMP processes]``
defining how many openMP processes to use (should be an int)
Returns
-------
str :
A string which should be used for the ``cdo`` call, either with or
without ``-P X``, where ``X`` is the number of openMP processes to
use.
"""
if not has_openMP:
has_openMP = determine_cdo_openMP()
if has_openMP:
return "cdo -P " + str(self.config["scope"]["number openMP processes"])
else:
return "cdo"
[docs] class ScopeDecorators(object):
"""Contains decorators you can use on class methods"""
# FIXME: I don't really like that this is exactly the same above with
# only a positional change. Probably it can abstracted away...
[docs] @staticmethod
def _wrap_hook(self, meth):
program_to_call = self.config[self.whos_turn].get("pre_" + meth.__name__, {}).get("program")
flags_for_program = self.config[self.whos_turn].get("pre_" + meth.__name__, {}).get("flags_for_program")
arguments_for_program = (
self.config[self.whos_turn].get("pre_" + meth.__name__, {}).get("arguments_for_program")
)
full_process = program_to_call
if flags_for_program:
full_process += flags_for_program
if arguments_for_program:
full_process += arguments_for_program
subprocess.run(full_process, shell=True, check=True)
# PG: Why is it a classmethod? I don't understand this yet...
[docs] @classmethod
def pre_hook(cls, meth):
""" Based upon the ``self.config``, runs a specific system command
Using the method name, you can define
"""
# Did you ask yourself -- What's wraps? See:
# https://www.thecodeship.com/patterns/guide-to-python-function-decorators/
@wraps(meth)
def wrapped_meth(self, *args):
self._wrap_hook(self, meth)
meth(self, *args)
return wrapped_meth
[docs] @classmethod
def post_hook(cls, meth):
@wraps(meth)
def wrapped_meth(self, *args):
meth(*args)
self._wrap_hook(self, meth)
return wrapped_meth
[docs]class Preprocess(Scope):
"""
Subclass of ``Scope`` which enables preprocessing of models via ``cdo``.
Use the ``preprocess`` method after building a ``Precprocess`` object.
"""
[docs] @Scope.ScopeDecorators.pre_hook
@Scope.ScopeDecorators.post_hook
def preprocess(self):
for (sender_type, sender_args) in self._all_senders():
for variable_name, variable_dict in sender_args.items():
logging.debug("The following files will be used for:", variable_name)
self._make_tmp_files_for_variable(variable_name, variable_dict)
self._combine_tmp_variable_files(sender_type)
[docs] def _all_senders(self):
"""
A generator giving tuples of the *reciever_type* (e.g. ice, atmosphere,
ocean, solid earth), and the *configuration for the reciever type*,
including variables and corresponding specifications for which files to
use and how to process them.
Example
-------
Here is an example for the reciever specification dictionary. See the
documentation regarding ``scope`` configuration for further
information:
.. code::
temp2:
files:
pattern: "{{ EXP_ID }}_echam6_echam_{{ DATE_PATTERN }}.grb"
take:
newest: 12
code table: "echam6"
aprl:
files:
dir: "/work/ollie/pgierz/scope_tests/outdata/echam/"
pattern: "{{ EXP_ID }}_echam6_echam_{{ DATE_PATTERN }}.grb"
take:
newest: 12
code table: "/work/ollie/pgierz/scope_tests/outdata/echam/PI_1x10_185001.01_echam.codes"
Yields
------
tuple of (str, dict)
The first element of the tuple, ``reciever_type``, is a string
describing what sort of model should get this data; e.g. "ice",
"atmosphere"
The second element, ``reciever_spec``, is a dictionary describing
which files should be used.
"""
if self.config[self.whos_turn].get("send"):
for reciever_type in self.config[self.whos_turn].get("send"):
reciever_spec = self.config[self.whos_turn]["send"][reciever_type]
yield (reciever_type, reciever_spec)
[docs] def _construct_filelist(self, var_dict):
"""
Example
-------
The variable configuration dictionary can have the following top-level **keys**:
* ``files`` may contain:
* a ``filepattern`` in regex to look for
* ``take`` which files to take, either specific, or
``newest``/``latest`` followed by an integer.
* ``dir`` a directory where to look for the files. Note that if
this is not provided, the default is to fall back to the top level
``outdata_dir`` for the currently sending model.
"""
r = re.compile(var_dict["files"]["pattern"])
file_directory = var_dict["files"].get("dir", self.config[self.whos_turn].get("outdata_dir"))
all_files = []
for rootname, _, filenames in os.walk(file_directory):
for filename in filenames:
full_name = os.path.join(rootname, filename)
all_files.append(full_name)
break # Do not go into subdirectories
# Just the matching files:
matching_files = sorted([f for f in all_files if r.match(os.path.basename(f))])
if "take" in var_dict["files"]:
if "newest" in var_dict["files"]["take"]:
take = var_dict["files"]["take"]["newest"]
return matching_files[-take:]
elif "oldest" in var_dict["files"]["take"]:
take = var_dict["files"]["take"]["oldest"]
return matching_files[:take]
# FIXME: This is wrong:
elif "specific" in var_dict["files"]["take"]:
return var_dict["files"]["take"]["specific"]
else:
return matching_files
[docs] def _make_tmp_files_for_variable(self, varname, var_dict):
"""
Generates temporary files for further processing with ``scope``.
Given a variable name and a description dictionary of how it should be
extracted and processed, this method makes a temporary file,
``<sender_name>_<varname>_file_for_scope.nc``, e.g.
``echam_temp2_file_for_scope.nc`` in the ``couple_dir``.
Parameters
----------
varname : str
Variable name as that should be selected from the files
var_dict : dict
A configuration dictionary describing how the variable should be
extracted. An example is given in ``_construct_filelist``.
Notes
-----
In addition to the dictionary description of ``files``, further
information may be added with the following top-level keys:
* ``code table`` describing which ``GRIB`` code numbers correspond to
which variables. If not given, the fallback value is the value of
``code table`` in the sender configuration.
Converts any input file to ``nc`` via `cdo`. Runs both ``select`` and
``settable``.
Returns
-------
None
"""
flist = self._construct_filelist(var_dict)
for f in flist:
print("- ", f)
code_table = var_dict.get("code table", self.config[self.whos_turn].get("code table"))
cdo_command = (
self.get_cdo_prefix()
+ " -f nc -t "
+ code_table
+ " -select,name="
+ varname
+ " "
+ " ".join(flist)
+ " "
+ self.config["scope"]["couple_dir"]
+ "/"
+ self.whos_turn
+ "_"
+ varname
+ "_file_for_scope.nc"
)
click.secho("Selecting %s for further processing with SCOPE..." % varname, fg="cyan")
click.secho(cdo_command, fg="cyan")
subprocess.run(cdo_command, shell=True, check=True)
# TODO/FIXME: This function does not work correctly if there are different
# time axis for each variable. It might be better to just leave each
# variable in it's own file.
[docs] def _combine_tmp_variable_files(self, reciever_type):
"""
Combines all files in the couple directory for a particular reciever type.
Depending on the configuration, this method combines all files found in
the ``couple_dir`` which may have been further processed by ``scope``
to a file ``<sender_type>_file_for_<reciever_type>.nc``
Parameters
----------
reciever_type : str
Which reciever the model is sending to, e.g. ice, ocean, atmosphere
Returns
-------
None
Notes
-----
This executes a ``cdo mergetime`` command to concatenate all files found which
should be sent to particular model.
"""
print(reciever_type)
reciever = self.config.get(self.whos_turn, {}).get("send", {}).get(reciever_type, {})
variables_to_send_to_reciever = list(reciever)
files_to_combine = []
for f in os.listdir(self.config["scope"]["couple_dir"]):
fvar = f.replace(self.whos_turn + "_", "").replace("_file_for_scope.nc", "")
if fvar in variables_to_send_to_reciever:
files_to_combine.append(os.path.join(self.config["scope"]["couple_dir"], f))
output_file = os.path.join(
self.config["scope"]["couple_dir"],
self.config[self.whos_turn]["type"] + "_file_for_" + reciever_type + ".nc",
)
cdo_command = self.get_cdo_prefix() + " mergetime " + " ".join(files_to_combine) + " " + output_file
click.secho("Combine files for sending to %s" % reciever_type, fg="cyan")
click.secho(cdo_command, fg="cyan")
subprocess.run(cdo_command, shell=True, check=True)
[docs]class Regrid(Scope):
[docs] def _calculate_weights(self, Model, Type, Interp):
regrid_weight_file = os.path.join(
self.config["scope"]["couple_dir"], "_".join([self.config[Model]["type"], Type, Interp, "weight_file.nc"])
)
cdo_command = (
self.get_cdo_prefix()
+ " gen"
+ Interp
+ ","
+ self.config["scope"]["couple_dir"]
+ "/"
+ self.config[Model]["griddes"]
+ " "
+ self.config["scope"]["couple_dir"]
+ "/"
+ Type
+ "_file_for_"
+ self.config[Model]["type"]
+ ".nc"
+ " "
+ regrid_weight_file
)
if not os.path.isfile(regrid_weight_file):
click.secho("Calculating weights: ", fg="cyan")
click.secho(cdo_command, fg="cyan")
subprocess.run(cdo_command, shell=True, check=True)
return regrid_weight_file
[docs] def regrid(self):
if self.config[self.whos_turn].get("recieve"):
for sender_type in self.config[self.whos_turn].get("recieve"):
if self.config[self.whos_turn]["recieve"].get(sender_type):
for Variable in self.config[self.whos_turn]["recieve"].get(sender_type):
Model = self.whos_turn
Type = sender_type
Interp = self.config[self.whos_turn].get("recieve").get(sender_type).get(Variable).get("interp")
self.regrid_one_var(Model, Type, Interp, Variable)
[docs] def regrid_one_var(self, Model, Type, Interp, Variable):
weight_file = self._calculate_weights(Model, Type, Interp)
cdo_command = (
self.get_cdo_prefix()
+ " remap,"
+ self.config[Model]["griddes"]
+ ","
+ weight_file
+ " "
+ "-selvar,"
+ Variable
+ " "
+ Type
+ "_file_for_"
+ self.config[Model]["type"]
+ ".nc "
+ Type
+ "_"
+ Variable
+ "_for_"
+ self.config[Model]["type"]
+ "_on_"
+ self.config[Model]["type"]
+ "_grid.nc"
)
click.secho("Remapping: ", fg="cyan")
click.secho(cdo_command, fg="cyan")
subprocess.run(cdo_command, shell=True, check=True)