digital_land package

Subpackages

Submodules

digital_land.api module

class digital_land.api.API(specification: Specification, url: str = 'https://files.planning.data.gov.uk', cache_dir: str = 'var/cache')

Bases: object

class Extension(value)

Bases: str, Enum

An enumeration.

CSV = 'csv'
SQLITE3 = 'sqlite3'
download_dataset(dataset: str, overwrite: bool = False, path: str | None = None, extension: Extension = Extension.CSV)

Downloads a dataset in CSV or SQLite3 format. - dataset: dataset name. - overwrite: overwrite file is it already exists (otherwise will just return). - path: file to download to (otherwise <cache-dir>/dataset/<dataset-name>.<extension>). - extension: 'csv' or 'sqlite3', 'csv' by default. - Returns: None. The file will be downloaded to the given path or cache, unless an exception occurs.

get_valid_category_values(dataset: str, pipeline: Pipeline) Mapping[str, Iterable[str]]

gets the valid caregory values. category_fields: Iterable category fields to get valid values for Returns: Mapping of field to valid values. If the valid values cannot be obtained, that field will be omitted.

digital_land.check module

digital_land.check.duplicate_reference_check(issues=None, csv_path=None)

digital_land.cli module

digital_land.collect module

class digital_land.collect.Collector(dataset='', collection_dir=None)

Bases: object

collect(endpoint_path, refill_todays_logs=False)
collection_dir_file_hashes(collection_dir=None)
fetch(url, endpoint=None, log_datetime=datetime.datetime(2025, 8, 20, 15, 28, 23, 263806), end_date='', plugin='', refill_todays_logs=False)
get(url, log={}, verify_ssl=True, plugin='get')
log_dir = 'collection/log/'
log_path(log_datetime, endpoint)
resource_dir = 'collection/resource/'
save(path, data, refill_todays_logs=False)
save_content(content)
save_log(path, log, refill_todays_logs=False)
save_resource(content, url, log)
strip_variable_content(content)
url_endpoint(url)
user_agent = 'MHCLG Planning Data Collector'
class digital_land.collect.FetchStatus(value)

Bases: Enum

An enumeration.

ALREADY_FETCHED = 4
EXPIRED = 2
FAILED = 5
HASH_FAILURE = 3
OK = 1
digital_land.collect.hash_file(path)

Returns the hash of a file as a hex digest.

digital_land.collection module

class digital_land.collection.Collection(name=None, directory='./collection')

Bases: object

add_endpoint(entry: dict) bool
add_source(entry: dict)
add_source_endpoint(entry: dict) bool

adds entries to teh endpoint and source csvs, if validation checks pass. :param entry: :return: Boolean value indicating if entries were added successfully

dataset_resource_map()

a map of resources needed by each dataset in a collection

end_date(entry)
entry_date(entry)
filtered_sources(filter: dict)
static format_date(date_val) str
load(directory=None, refill_todays_logs=False)
load_log_items(directory=None, log_directory=None, after=None)

Method to load the log store and resource store from log items instead of csvs. used when csvs don't exist or new log items have been created by running a collector. If 'after' is not None, only log items after the specified date / time will be loaded.

pipeline_makerules(specification_dir, pipeline_dir, resource_dir, incremental_loading_override, state_path=None)
recalculate_source_hashes()
resource_endpoints(resource)

the list of endpoints a resource was collected from

resource_organisations(resource)

the list of organisations for which a resource was collected

resource_path(resource)
resource_start_date(resource)

the first date a resource was collected

retire_endpoints_and_sources(collection_df_to_retire) None

This method should be called from commands.py with the correct input.csv format per unique collection.

Parameters:
  • collection (Collection) -- The Collection object which gets instantiated by a corresponding commands.py function which calls this method.

  • collection_df_to_retire (str) -- A small dataframe of sources and endpoints to be retired within a colleciton, this should be generated by a function call within commands.py.

save_csv(directory=None)
start_date(entry)
update()
class digital_land.collection.EndpointStore(schema=<digital_land.schema.Schema object>)

Bases: CSVStore

is_not_duplicate(endpoint_item: Item) bool

check if given endpoint already exists :param endpoint_item: :return:

static validate_entry(endpoint_item: Item) bool

Checks whether the supplied parameter is valid according to business rules :return: Boolean

class digital_land.collection.LogStore(*args, **kwargs)

Bases: ItemStore

add_entry(item)
check_item_path(item, in_path)
latest_entry_date()

Gets the latest entry date from the log

load_item(path)
save_item(item, path)
class digital_land.collection.ResourceLogStore(*args, **kwargs)

Bases: CSVStore

load(log: LogStore, source: CSVStore, directory: str = './collection', after: datetime | None = None)

Rebuild or update resource.csv file from the log store.

We cannot assume that all resources are present on the local file system as we also want to keep records of resources we have not collected within the current collector execution due to their end_date elapsing.

If 'after' is not None, only log entires after the given datetime will be loaded (used when updating).

Parameters:
class digital_land.collection.SourceStore(schema=<digital_land.schema.Schema object>)

Bases: CSVStore

static validate_entry(source_item: Item | None = None) bool

Checks whether the supplied parameter is valid according to business rules :return: Boolean

digital_land.collection.isodate(s)
digital_land.collection.resource_path(resource, directory='./collection')
digital_land.collection.resource_url(collection, resource)

digital_land.column_fields module

digital_land.command_arguments module

digital_land.command_arguments.collection_dir(f)
digital_land.command_arguments.column_field_dir(f)
digital_land.command_arguments.config_collections_dir(f)
digital_land.command_arguments.converted_resource_dir(f)
digital_land.command_arguments.dataset_resource_dir(f)
digital_land.command_arguments.endpoint_path(f)
digital_land.command_arguments.input_output_path(f)
digital_land.command_arguments.issue_dir(f)
digital_land.command_arguments.operational_issue_dir(f)
digital_land.command_arguments.organisation_path(f)
digital_land.command_arguments.output_log_dir(f)
digital_land.command_arguments.source_path(f)

digital_land.commands module

digital_land.commands.add_data(csv_file_path, collection_name, collection_dir, pipeline_dir, specification_dir, organisation_path, cache_dir=None)
digital_land.commands.add_endpoints_and_lookups(csv_file_path, collection_name, collection_dir, pipeline_dir, specification_dir, organisation_path, tmp_dir='./var/cache')
Parameters:
  • csv_file_path

  • collection_name

  • collection_dir

  • pipeline_dir

  • specification_dir

  • organisation_path

  • tmp_dir

Returns:

digital_land.commands.add_redirections(csv_file_path, pipeline_dir)
Parameters:
  • csv_file_path

  • pipeline_dir

Returns:

digital_land.commands.assign_entities(resource_file_paths, collection, dataset, organisation, pipeline_dir, specification_dir, organisation_path, endpoints, tmp_dir='./var/cache')

Assigns entities for the given resources in the given collection. The resources must have sources already added to the collection :param resource_file_paths: :param collection: :param pipeline_dir: :param specification_dir: :param organisation_path: :param tmp_dir: :return:

digital_land.commands.check_and_assign_entities(resource_file_paths, endpoints, collection_name, dataset, organisation, collection_dir, organisation_path, specification_dir, pipeline_dir, input_path=None)
digital_land.commands.collect(endpoint_path, collection_dir, pipeline, refill_todays_logs=False)

fetch the sources listed in the endpoint-url column of the ENDPOINT_PATH CSV file

digital_land.commands.collection_add_source(entry, collection, endpoint_url, collection_dir)

followed by a sequence of optional name and value pairs including the following names: "attribution", "licence", "pipelines", "status", "plugin", "parameters", "start-date", "end-date"

digital_land.commands.collection_list_resources(collection_dir)
digital_land.commands.collection_pipeline_makerules(collection_dir, specification_dir, pipeline_dir, resource_dir, incremental_loading_override, state_path=None)
digital_land.commands.collection_retire_endpoints_and_sources(config_collections_dir, endpoints_sources_to_retire_csv_path)

Retires endpoints and sources based on an input.csv. Please note this requires an input csv with the columns: collection, endpoint and source. :param config_collections_dir: The directory containing the collections. :param endpoints_sources_to_retire_csv_path: The filepath to the csv containing endpoints and sources to retire.

digital_land.commands.collection_save_csv(collection_dir, refill_todays_logs=False)
digital_land.commands.convert(input_path, output_path)
digital_land.commands.dataset_create(input_paths, output_path, organisation_path, pipeline, dataset, specification, issue_dir='issue', column_field_dir='var/column-field', dataset_resource_dir='var/dataset-resource', cache_dir='var/cache', resource_path='collection/resource.csv')
digital_land.commands.dataset_dump(input_path, output_path)
digital_land.commands.dataset_dump_flattened(csv_path, flattened_dir, specification, dataset)
digital_land.commands.dataset_update(input_paths, output_path, organisation_path, pipeline, dataset, specification, issue_dir='issue', column_field_dir='var/column-field', dataset_resource_dir='var/dataset-resource', bucket_name=None, dataset_path=None)

Updates the current state of the sqlite files being held in S3 with new resources dataset_path can be passed in to update a local sqlite file instead of downloading from S3.

digital_land.commands.default_output_path(command, input_path)
digital_land.commands.fetch(url, pipeline)

fetch a single source endpoint URL, and add it to the collection

digital_land.commands.get_resource_unidentified_lookups(input_path: Path, dataset: str, pipeline: Pipeline, specification: Specification, organisations: list = [], tmp_dir: Path | None = None, org_csv_path: Path | None = None, endpoints: list = [])
digital_land.commands.operational_issue_save_csv(operational_issue_dir, dataset)
digital_land.commands.organisation_check(**kwargs)
digital_land.commands.organisation_create(**kwargs)
digital_land.commands.pipeline_run(dataset, pipeline, specification, input_path, output_path, collection_dir, null_path=None, issue_dir=None, operational_issue_dir='performance/operational_issue/', organisation_path=None, save_harmonised=False, column_field_dir=None, dataset_resource_dir=None, converted_resource_dir=None, cache_dir='var/cache', endpoints=[], organisations=[], entry_date='', config_path='var/cache/config.sqlite3', resource=None, output_log_dir=None, converted_path=None)
digital_land.commands.process_data_in_batches(entities, flattened_dir, dataset_name)
digital_land.commands.resource_from_path(path)
digital_land.commands.save_state(specification_dir, collection_dir, pipeline_dir, resource_dir, incremental_loading_override, output_path)
digital_land.commands.validate_and_add_data_input(csv_file_path, collection_name, collection_dir, specification_dir, organisation_path)

digital_land.dataset module

digital_land.fetch module

digital_land.fetch.cache_path(url, filename=None)
digital_land.fetch.fetch(url, filename=None, path=None)

digital_land.log module

class digital_land.log.ColumnFieldLog(dataset='', resource='')

Bases: Log

add(column, field)
fieldnames = ['dataset', 'resource', 'column', 'field']
class digital_land.log.ConvertedResourceLog(dataset='', resource='')

Bases: Log

Failed = 'failed'
Success = 'success'
add(elapsed, status, exception='')
fieldnames = ['dataset', 'resource', 'elapsed', 'status', 'exception']
class digital_land.log.DatasetResourceLog(*args, **kwargs)

Bases: Log

add()
fieldnames = ['dataset', 'resource', 'entry-count', 'line-count', 'mime-type', 'internal-path', 'internal-mime-type']
save(*args, **kwargs)
class digital_land.log.IssueLog(dataset='', resource='')

Bases: Log

add_severity_column(severity_mapping_path)
appendErrorMessage(mapping_path)
apply_entity_map()
fieldnames = ['dataset', 'resource', 'line-number', 'entry-number', 'field', 'entity', 'issue-type', 'value', 'message']
log(issue_type, value, message=None)
log_issue(fieldname, issue_type, value, message=None, line_number=0, entry_number=0, entity=None)
record_entity_map(entry, entity)
class digital_land.log.Log(dataset='', resource='')

Bases: object

add(*args, **kwargs)
save(path=None, f=None)
save_parquet(output_dir)
class digital_land.log.OperationalIssueLog(dataset='', resource='', operational_issue_dir=None)

Bases: IssueLog

get_now()
load(operational_issue_directory=None)
load_log_items(operational_issue_directory=None, after=None)

Method to load the operational issue store from operational issue items instead of csvs. used when csvs don't exist or new issue items have been created by running the pipeline. If 'after' is not None, only log items after the specified date / time will be loaded.

save(output_dir=None, path=None, f=None)
save_csv(directory=None)
update()
digital_land.log.entry_date()

digital_land.makerules module

class digital_land.makerules.ProcessingOption(value)

Bases: Enum

An enumeration.

PROCESS_ALL = 'all'
PROCESS_NONE = 'none'
PROCESS_PARTIAL = 'partial'
digital_land.makerules.dataset_path(dataset)
digital_land.makerules.get_processing_option(collection, specification_dir, pipeline_dir, resource_dir, incremental_loading_override, state_path)
digital_land.makerules.pipeline_makerules(collection, specification_dir, pipeline_dir, resource_dir, incremental_loading_override, state_path=None)
digital_land.makerules.transformed_path(resource, dataset)

digital_land.organisation module

class digital_land.organisation.Organisation(organisation_path=None, pipeline_dir=None, organisation=None)

Bases: object

get(org: str)

provide the organisation and get the organisation row use organisation lookup first to ccheck the curie your using

get_orgs_by_dataset(dataset)

gets a list of dictionaries where each entry represents an organisation

load_organisation()
lookup(organisation)
organisation = {}
organisation_lookup = {}
organisation_path = 'var/cache/organisation.csv'
organisation_uri = {}
pipeline_patch_path = 'pipeline/patch.csv'
digital_land.organisation.lower_uri(value)
digital_land.organisation.uri_basename(value)

digital_land.register module

class digital_land.register.Entry(dict=None, /, **kwargs)

Bases: Item

an ordered item in a register

class digital_land.register.Item(dict=None, /, **kwargs)

Bases: UserDict

hash()
pack()
unpack(data)
class digital_land.register.Record(initlist=None)

Bases: UserList

an ordered list of entries sharing the same key value

class digital_land.register.Register(store)

Bases: object

digital_land.register.hash_value(data)

digital_land.schema module

class digital_land.schema.Field(name)

Bases: object

information about a field

class digital_land.schema.Schema(name)

Bases: object

digital_land.specification module

class digital_land.specification.Specification(path='specification')

Bases: object

base_field(field)
current_fieldnames(schema=None)
dataset_prefix(dataset)
factor_fieldnames()
field_dataset(fieldname)
field_parent(fieldname)
field_prefix(field)
field_type(fieldname)
field_typology(fieldname)
get_category_fields(dataset)
get_dataset_entity_max(dataset)
get_dataset_entity_min(dataset)
get_dataset_typology(dataset)
get_field_datatype_map()
get_field_prefix_map()
get_field_typology_map()
get_odp_collections()
index_field()
index_schema()
intermediate_fieldnames(pipeline)
load_dataset(path)
load_dataset_field(path)
load_dataset_schema(path)
load_datatype(path)
load_field(path)
load_licence(path)
load_odp_collections(path)
load_pipeline(path)
load_schema(path)
load_schema_field(path)
load_typology(path)

digital_land.state module

class digital_land.state.State(data)

Bases: dict

build(collection_dir, pipeline_dir, resource_dir, incremental_loading_override)

Build a state object from the current configuration and code

get_code_hash()
get_dir_hash(exclude=[])
load()

Build a state object from a previously saved state file

save(output_path)

Saves a state object to a file

digital_land.state.compare_state(specification_dir, collection_dir, pipeline_dir, resource_dir, incremental_loading_override, state_path)

Compares the current state against the one in state_path. Returns a list of different elements, or None if they are the same.

digital_land.update module

digital_land.update.add_endpoint(entry, endpoint_register)
digital_land.update.add_source(entry, source_register)
digital_land.update.add_source_endpoint(entry, directory=None, collection=None)
digital_land.update.end_date(entry)
digital_land.update.entry_date(entry)
digital_land.update.recalculate_source_hashes(collection)
digital_land.update.start_date(entry)

Module contents

digital_land.csv_field_size_limit(field_size_limit=9223372036854775807)