#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
This module defines a rastertool named Filtering that can apply different kind
of filters on raster images.
"""
import logging.config
from typing import List, Dict
from pathlib import Path
from eolab.georastertools import utils
from eolab.georastertools import Rastertool, Windowable
from eolab.georastertools.processing import algo
from eolab.georastertools.processing import RasterFilter, compute_sliding
from eolab.georastertools.product import RasterProduct
_logger = logging.getLogger(__name__)
[docs]class Filtering(Rastertool, Windowable):
"""Raster tool that applies a filter on a raster image.
Predefined filters are available:
- Median filter
- Local sum
- Local mean
- Adaptive gaussian filter.
A filter is applied on a kernel of a configurable size. To set the kernel size, you need
to call:
.. code-block:: python
tool = Filtering(Filtering.median_filter, kernel_size=16)
Custom additional filters can be easily added by instanciating a RasterFilter (see
:obj:`eolab.georastertools.processing.RasterFilter`).
The parameters that customize the filter algo can be passed by following this procedure:
.. code-block:: python
def myalgo(input_data, **kwargs):
# kwargs contain the values of the algo parameters
kernel_size = kwargs.get('kernel_size', 8)
myparam = kwargs.get('myparam', 1)
# ... do someting ...
my_filter = RasterFilter(
"myfilter", algo=myalgo
).with_documentation(
help="Apply my filter",
description="Apply my filter"
).with_arguments({
"myparam": {
# configure the command line interface for this param, this can be skipped
"default": 8,
"required": True,
"type": int,
"help": "my configuration param for myalgo"
}
})
# create the raster tool
tool = Filtering(my_filter, kernel_size=16)
my_filter.with_output(".")
my_filter.with_window(1024, "edge")
# set the configuration parameter of the algo
my_filter.with_filter_configuration({"myparam": 1024})
# run the tool
tool.process_file("./mytif.tif")
"""
median_filter = RasterFilter(
"median", algo=algo.median
).with_documentation(
help="Apply median filter",
description="Apply a median filter (see scipy median_filter for more information)"
)
"""
Applies a Median Filter to the input data using
`scipy.ndimage.median_filter <https://docs.scipy.org/doc/scipy/reference/generated/scipy.ndimage.median_filter.html>`_.
The filter computes the median contained in the sliding window determined by kernel_size.
Returns:
Numpy array containing the input data filtered by the median filter
"""
local_sum = RasterFilter(
"sum", algo=algo.local_sum
).with_documentation(
help="Apply local sum filter",
description="Apply a local sum filter using integral image method"
)
"""Computes the local sums of the input data.
Each element is the sum of the pixels contained in the sliding window determined by kernel_size.
Returns:
Numpy array of the size of input_data containing the computed local sums
"""
local_mean = RasterFilter(
"mean", algo=algo.local_mean
).with_documentation(
help="Apply local mean filter",
description="Apply a local mean filter using integral image method",
)
"""Computes the local means of the input data.
Each element is the mean of the pixels contained in the sliding window determined by kernel_size.
Returns:
Numpy array of the size of input_data containing the computed local means
"""
adaptive_gaussian = RasterFilter(
"adaptive_gaussian", algo=algo.adaptive_gaussian, per_band_algo=True
).with_documentation(
help="Apply adaptive gaussian filter",
description="Apply an adaptive (Local gaussian of 3x3) recursive filter on the input image",
).with_arguments({
"sigma": {
"default": 1,
"required": True,
"type": float,
"help": "Standard deviation of the Gaussian distribution (sigma)"
},
})
"""RasterFilter that applies an adaptive gaussian filter to the kernel. The parameter sigma defines the standard deviation
of the Gaussian distribution.
Returns:
Numpy array containing the input data filtered by the Gaussian filter.
"""
[docs] @staticmethod
def get_default_filters():
"""Get the list of predefined raster filters
Returns:
[:obj:`eolab.georastertools.processing.RasterFilter`] List of the predefined
raster filters ([Median, Local sum, Local mean, Adaptive gaussian])
"""
return [
Filtering.median_filter, Filtering.local_sum,
Filtering.local_mean, Filtering.adaptive_gaussian
]
[docs] def __init__(self, raster_filter: RasterFilter, kernel_size: int, bands: List[int] = [1]):
"""Constructor
Args:
raster_filter (:obj:`eolab.georastertools.processing.RasterFilter`):
The instance of RasterFilter to apply
kernel_size (int):
Size of the kernel on which the filter is applied
bands ([int], optional, default=[1]):
List of bands in the input image to process.
Set None if all bands shall be processed.
"""
super().__init__()
# initialize default windowing configuration
self.with_windows()
# the raster filter processing
self._raster_filter = raster_filter
self._raster_filter.configure({"kernel_size": kernel_size})
self._bands = bands
@property
def bands(self) -> List[int]:
"""List of bands to process"""
return self._bands
@property
def raster_filter(self) -> RasterFilter:
"""Name of the filter to apply to the raster"""
return self._raster_filter
[docs] def with_filter_configuration(self, argsdict: Dict):
"""Configure the filter with its specific arguments' values
Args:
argsdict (dict):
Dictionary of filter's arguments names and values. Shall contain at least
the kernel size (key="kernel")
Returns:
:obj:`eolab.georastertools.Filtering`: The current instance so that it is
possible to chain the with... calls (fluent API)
"""
self.raster_filter.configure(argsdict)
return self
[docs] def process_file(self, inputfile: str) -> List[str]:
"""Apply the filter to the input file
Args:
inputfile (str):
Input image to process
Returns:
([str]) A list of one element containing the path of the generated filtered image.
"""
_logger.info(f"Processing file {inputfile}")
overlap = (self.raster_filter.kernel_size + 1) // 2
if overlap >= min(self.window_size) / 2:
raise ValueError("The kernel size (option --kernel_size, "
f"value={self.raster_filter.kernel_size}) "
"must be strictly less than the window size minus 1 "
f"(option --window_size, value={min(self.window_size)})")
# STEP 1: Prepare the input image so that it can be processed
with RasterProduct(inputfile, vrt_outputdir=self.vrt_dir) as product:
# STEP 2: apply filter
outdir = Path(self.outputdir)
output_image = outdir.joinpath(
f"{utils.get_basename(inputfile)}-{self.raster_filter.name}.tif")
compute_sliding(
product.get_raster(), output_image, self.raster_filter,
window_size=self.window_size,
window_overlap=(self.raster_filter.kernel_size + 1) // 2,
pad_mode=self.pad_mode,
bands=self.bands)
return [output_image.as_posix()]