digital_land.pipeline package
Submodules
digital_land.pipeline.main module
- class digital_land.pipeline.main.EntityNumGen(entity_num_state: dict | None = None)
Bases:
object- next()
- class digital_land.pipeline.main.Lookups(directory=None)
Bases:
object- add_entry(entry, is_new_entry=True)
is_new_entry is an addition to allow for backward compatibility. Older lookups may not be valid in accordance with the current minimal column requirements :param entry: :param is_new_entry: :return:
- get_max_entity(prefix, specification) int
- load_csv(lookups_path=None)
load in lookups as df, not when we process pipeline but useful for other analysis
- save_csv(lookups_path=None, entries=None, old_entity_path=None)
- validate_entry(entry) bool
- class digital_land.pipeline.main.Pipeline(path, dataset, specification=None, config=None)
Bases:
object- columns(resource='', endpoints=[])
- combine_fields(endpoints=None)
- static compose(phases)
- concatenations(resource=None, endpoints=[])
- default_fields(resource=None, endpoints=[])
- default_values(endpoints=None)
- file_reader(filename)
- filters(resource='', endpoints=[])
- get_pipeline_callback()
- init_logs(dataset, resource)
- load_column()
- load_combine_fields()
- load_concat()
- load_default_fields()
- load_default_values()
- load_filter()
- load_lookup()
- load_migrate()
- load_patch()
- load_redirect_lookup()
- load_skip_patterns()
- lookups(resource=None)
- migrations()
- patches(resource='', endpoints=[])
- reader(filename)
- redirect_lookups()
- run(*phases)
- static run_phases(*phases)
Execute a sequence of phases by composing their .process() pipelines.
Phases are expected to be objects with a process(iterable) method. Historically the chain has been started with None.
- save_logs(issue_path=None, operational_issue_path=None, column_field_path=None, dataset_resource_path=None, converted_resource_path=None)
Save logs to respective paths. Only saves when path is provided (not None).
- Returns:
True if all logs saved successfully, False otherwise.
- Return type:
bool
- skip_patterns(resource='', endpoints=[])
- transform(input_path: str, output_path: Path, organisation: Organisation, resource: str, valid_category_values: Dict, endpoints: List[str] | None = None, organisations: List[str] | None = None, entry_date: str = '', converted_path: str | None = None, harmonised_output_path: str | None = None, save_harmonised: bool = False, disable_lookups: bool = False) IssueLog
Build and run the default resource -> transformed phase list.
This mirrors the legacy commands.pipeline_run() phase wiring, but keeps the execution responsibility inside Pipeline.
- Parameters:
input_path (str) -- Path to the input resource CSV file to transform (i.e. collection/resource/{file-hash}).
output_path (Path) -- Path where the final transformed CSV will be written (i.e. transformed/{dataset-name}/{file-hash}.csv).
organisation (Organisation) -- Organisation object containing org-specific lookups and mappings.
resource (str) -- Resource file identifier (hash), TBD can be removed.
valid_category_values (dict) -- Dictionary of valid category values per field from the API/specification.
endpoints (list, optional) -- List of endpoint hashes/identifiers for this resource. Defaults to None.
organisations (list, optional) -- List of organisation codes/identifiers associated with the resource. Defaults to None, Note if one passed, org will become default value
entry_date (str, optional) -- Default entry-date value to apply to all records. Defaults to "".
converted_path (str, optional) -- Path to save converted (pre-normalised) resource. Defaults to None.
harmonised_output_path (str, optional) -- Path to save the harmonised/intermediate output. Defaults to None.
save_harmonised (bool, optional) -- Whether to save the harmonised intermediate output. Defaults to False.
disable_lookups (bool, optional) -- Whether to disable entity lookups and pruning phases. Defaults to False. (useful for checking data before lookups are applied)
- Returns:
The completed issue log containing all data quality issues found during transformation.
- Return type:
- class digital_land.pipeline.main.PipelineStatus(value)
Bases:
EnumAn enumeration.
- COMPLETE = 3
- ERROR = 4
- FAILED = 5
- INITIALISED = 1
- RUNNING = 2
- digital_land.pipeline.main.chain_phases(phases)
- digital_land.pipeline.main.run_pipeline(*args)
Backward compatible wrapper.
Prefer calling Pipeline.run(*phases) on a configured Pipeline instance.
digital_land.pipeline.process module
- digital_land.pipeline.process.convert_tranformed_csv_to_pq(input_path, output_path)
function to convert a transformed resource to a parrquet file.
Module contents
sub package containing code for processing resources into transformed resources