digital_land.pipeline package

Submodules

digital_land.pipeline.main module

class digital_land.pipeline.main.EntityNumGen(entity_num_state: dict | None = None)

Bases: object

next()
class digital_land.pipeline.main.Lookups(directory=None)

Bases: object

add_entry(entry, is_new_entry=True)

is_new_entry is an addition to allow for backward compatibility. Older lookups may not be valid in accordance with the current minimal column requirements :param entry: :param is_new_entry: :return:

get_max_entity(prefix, specification) int
load_csv(lookups_path=None)

load in lookups as df, not when we process pipeline but useful for other analysis

save_csv(lookups_path=None, entries=None, old_entity_path=None)
validate_entry(entry) bool
class digital_land.pipeline.main.Pipeline(path, dataset, specification=None, config=None)

Bases: object

columns(resource='', endpoints=[])
combine_fields(endpoints=None)
static compose(phases)
concatenations(resource=None, endpoints=[])
default_fields(resource=None, endpoints=[])
default_values(endpoints=None)
file_reader(filename)
filters(resource='', endpoints=[])
get_pipeline_callback()
init_logs(dataset, resource)
load_column()
load_combine_fields()
load_concat()
load_default_fields()
load_default_values()
load_filter()
load_lookup()
load_migrate()
load_patch()
load_redirect_lookup()
load_skip_patterns()
lookups(resource=None)
migrations()
patches(resource='', endpoints=[])
reader(filename)
redirect_lookups()
run(*phases)
static run_phases(*phases)

Execute a sequence of phases by composing their .process() pipelines.

Phases are expected to be objects with a process(iterable) method. Historically the chain has been started with None.

save_logs(issue_path=None, operational_issue_path=None, column_field_path=None, dataset_resource_path=None, converted_resource_path=None)

Save logs to respective paths. Only saves when path is provided (not None).

Returns:

True if all logs saved successfully, False otherwise.

Return type:

bool

skip_patterns(resource='', endpoints=[])
transform(input_path: str, output_path: Path, organisation: Organisation, resource: str, valid_category_values: Dict, endpoints: List[str] | None = None, organisations: List[str] | None = None, entry_date: str = '', converted_path: str | None = None, harmonised_output_path: str | None = None, save_harmonised: bool = False, disable_lookups: bool = False) IssueLog

Build and run the default resource -> transformed phase list.

This mirrors the legacy commands.pipeline_run() phase wiring, but keeps the execution responsibility inside Pipeline.

Parameters:
  • input_path (str) -- Path to the input resource CSV file to transform (i.e. collection/resource/{file-hash}).

  • output_path (Path) -- Path where the final transformed CSV will be written (i.e. transformed/{dataset-name}/{file-hash}.csv).

  • organisation (Organisation) -- Organisation object containing org-specific lookups and mappings.

  • resource (str) -- Resource file identifier (hash), TBD can be removed.

  • valid_category_values (dict) -- Dictionary of valid category values per field from the API/specification.

  • endpoints (list, optional) -- List of endpoint hashes/identifiers for this resource. Defaults to None.

  • organisations (list, optional) -- List of organisation codes/identifiers associated with the resource. Defaults to None, Note if one passed, org will become default value

  • entry_date (str, optional) -- Default entry-date value to apply to all records. Defaults to "".

  • converted_path (str, optional) -- Path to save converted (pre-normalised) resource. Defaults to None.

  • harmonised_output_path (str, optional) -- Path to save the harmonised/intermediate output. Defaults to None.

  • save_harmonised (bool, optional) -- Whether to save the harmonised intermediate output. Defaults to False.

  • disable_lookups (bool, optional) -- Whether to disable entity lookups and pruning phases. Defaults to False. (useful for checking data before lookups are applied)

Returns:

The completed issue log containing all data quality issues found during transformation.

Return type:

IssueLog

class digital_land.pipeline.main.PipelineStatus(value)

Bases: Enum

An enumeration.

COMPLETE = 3
ERROR = 4
FAILED = 5
INITIALISED = 1
RUNNING = 2
digital_land.pipeline.main.chain_phases(phases)
digital_land.pipeline.main.run_pipeline(*args)

Backward compatible wrapper.

Prefer calling Pipeline.run(*phases) on a configured Pipeline instance.

digital_land.pipeline.process module

digital_land.pipeline.process.convert_tranformed_csv_to_pq(input_path, output_path)

function to convert a transformed resource to a parrquet file.

Module contents

sub package containing code for processing resources into transformed resources