digital_land package
Subpackages
- digital_land.adapter package
- digital_land.datatype package
- Submodules
- digital_land.datatype.address module
- digital_land.datatype.datatype module
- digital_land.datatype.date module
- digital_land.datatype.decimal module
- digital_land.datatype.enum module
- digital_land.datatype.factory module
- digital_land.datatype.flag module
- digital_land.datatype.integer module
- digital_land.datatype.latitude module
- digital_land.datatype.longitude module
- digital_land.datatype.multipolygon module
- digital_land.datatype.organisation module
- digital_land.datatype.point module
- digital_land.datatype.string module
- digital_land.datatype.uri module
- digital_land.datatype.wkt module
- Module contents
- digital_land.expectations package
- Subpackages
- Submodules
- digital_land.expectations.commands module
- digital_land.expectations.exception module
- digital_land.expectations.log module
- digital_land.expectations.operation module
- digital_land.expectations.result module
ExpectationResult
ExpectationResult.act_on_failure()
ExpectationResult.checkpoint
ExpectationResult.data_name
ExpectationResult.dict_for_export()
ExpectationResult.expectation_result
ExpectationResult.from_dict()
ExpectationResult.from_json()
ExpectationResult.issues
ExpectationResult.message
ExpectationResult.passed
ExpectationResult.save_to_file()
ExpectationResult.schema()
ExpectationResult.severity
ExpectationResult.to_dict()
ExpectationResult.to_json()
SeverityEnum
- digital_land.expectations.utils module
- Module contents
- digital_land.package package
- Submodules
- digital_land.package.csv module
- digital_land.package.dataset module
DatasetPackage
DatasetPackage.add_counts()
DatasetPackage.entity_row()
DatasetPackage.entry_date_upsert()
DatasetPackage.insert_entity()
DatasetPackage.load()
DatasetPackage.load_column_fields()
DatasetPackage.load_dataset_resource()
DatasetPackage.load_entities()
DatasetPackage.load_facts()
DatasetPackage.load_issues()
DatasetPackage.load_old_entities()
DatasetPackage.load_transformed()
DatasetPackage.migrate_entity()
- digital_land.package.dataset_parquet module
DatasetParquetPackage
DatasetParquetPackage.analyze_parquet_dir()
DatasetParquetPackage.choose_strategy()
DatasetParquetPackage.close_conn()
DatasetParquetPackage.combine_parquet_files()
DatasetParquetPackage.get_schema()
DatasetParquetPackage.group_parquet_files()
DatasetParquetPackage.load()
DatasetParquetPackage.load_entities()
DatasetParquetPackage.load_entities_range()
DatasetParquetPackage.load_fact_resource()
DatasetParquetPackage.load_facts()
DatasetParquetPackage.load_to_sqlite()
- digital_land.package.organisation module
- digital_land.package.package module
- digital_land.package.sqlite module
SqlitePackage
SqlitePackage.colvalue()
SqlitePackage.commit()
SqlitePackage.connect()
SqlitePackage.create()
SqlitePackage.create_cursor()
SqlitePackage.create_database()
SqlitePackage.create_index()
SqlitePackage.create_indexes()
SqlitePackage.create_table()
SqlitePackage.create_tables()
SqlitePackage.disconnect()
SqlitePackage.drop_index()
SqlitePackage.drop_indexes()
SqlitePackage.execute()
SqlitePackage.field_coltype()
SqlitePackage.get_table_fields()
SqlitePackage.insert()
SqlitePackage.load()
SqlitePackage.load_from_s3()
SqlitePackage.load_join_table()
SqlitePackage.load_table()
SqlitePackage.set_up_connection()
SqlitePackage.spatialite()
colname()
coltype()
- Module contents
- digital_land.phase package
- Submodules
- digital_land.phase.combine module
- digital_land.phase.concat module
- digital_land.phase.convert module
- digital_land.phase.default module
- digital_land.phase.dump module
- digital_land.phase.factor module
- digital_land.phase.filter module
- digital_land.phase.harmonise module
- digital_land.phase.load module
- digital_land.phase.lookup module
- digital_land.phase.map module
- digital_land.phase.migrate module
- digital_land.phase.normalise module
- digital_land.phase.organisation module
- digital_land.phase.parse module
- digital_land.phase.patch module
- digital_land.phase.phase module
- digital_land.phase.pivot module
- digital_land.phase.prefix module
- digital_land.phase.priority module
- digital_land.phase.prune module
- digital_land.phase.reference module
- digital_land.phase.save module
- Module contents
- digital_land.pipeline package
- Submodules
- digital_land.pipeline.main module
EntityNumGen
Lookups
Pipeline
Pipeline.columns()
Pipeline.combine_fields()
Pipeline.compose()
Pipeline.concatenations()
Pipeline.default_fields()
Pipeline.default_values()
Pipeline.file_reader()
Pipeline.filters()
Pipeline.get_pipeline_callback()
Pipeline.load_column()
Pipeline.load_combine_fields()
Pipeline.load_concat()
Pipeline.load_default_fields()
Pipeline.load_default_values()
Pipeline.load_filter()
Pipeline.load_lookup()
Pipeline.load_migrate()
Pipeline.load_patch()
Pipeline.load_redirect_lookup()
Pipeline.load_skip_patterns()
Pipeline.lookups()
Pipeline.migrations()
Pipeline.patches()
Pipeline.reader()
Pipeline.redirect_lookups()
Pipeline.run()
Pipeline.skip_patterns()
chain_phases()
run_pipeline()
- digital_land.pipeline.process module
- Module contents
- digital_land.plugins package
- digital_land.store package
- digital_land.utils package
Submodules
digital_land.api module
- class digital_land.api.API(specification: Specification, url: str = 'https://files.planning.data.gov.uk', cache_dir: str = 'var/cache')
Bases:
object
- download_dataset(dataset: str, overwrite: bool = False, path: str | None = None, extension: Extension = Extension.CSV)
Downloads a dataset in CSV or SQLite3 format. - dataset: dataset name. - overwrite: overwrite file is it already exists (otherwise will just return). - path: file to download to (otherwise <cache-dir>/dataset/<dataset-name>.<extension>). - extension: 'csv' or 'sqlite3', 'csv' by default. - Returns: None. The file will be downloaded to the given path or cache, unless an exception occurs.
digital_land.check module
- digital_land.check.duplicate_reference_check(issues=None, csv_path=None)
digital_land.cli module
digital_land.collect module
- class digital_land.collect.Collector(dataset='', collection_dir=None)
Bases:
object
- collect(endpoint_path, refill_todays_logs=False)
- collection_dir_file_hashes(collection_dir=None)
- fetch(url, endpoint=None, log_datetime=datetime.datetime(2025, 8, 20, 15, 28, 23, 263806), end_date='', plugin='', refill_todays_logs=False)
- get(url, log={}, verify_ssl=True, plugin='get')
- log_dir = 'collection/log/'
- log_path(log_datetime, endpoint)
- resource_dir = 'collection/resource/'
- save(path, data, refill_todays_logs=False)
- save_content(content)
- save_log(path, log, refill_todays_logs=False)
- save_resource(content, url, log)
- strip_variable_content(content)
- url_endpoint(url)
- user_agent = 'MHCLG Planning Data Collector'
- class digital_land.collect.FetchStatus(value)
Bases:
Enum
An enumeration.
- ALREADY_FETCHED = 4
- EXPIRED = 2
- FAILED = 5
- HASH_FAILURE = 3
- OK = 1
- digital_land.collect.hash_file(path)
Returns the hash of a file as a hex digest.
digital_land.collection module
- class digital_land.collection.Collection(name=None, directory='./collection')
Bases:
object
- add_endpoint(entry: dict) bool
- add_source(entry: dict)
- add_source_endpoint(entry: dict) bool
adds entries to teh endpoint and source csvs, if validation checks pass. :param entry: :return: Boolean value indicating if entries were added successfully
- dataset_resource_map()
a map of resources needed by each dataset in a collection
- end_date(entry)
- entry_date(entry)
- filtered_sources(filter: dict)
- static format_date(date_val) str
- load(directory=None, refill_todays_logs=False)
- load_log_items(directory=None, log_directory=None, after=None)
Method to load the log store and resource store from log items instead of csvs. used when csvs don't exist or new log items have been created by running a collector. If 'after' is not None, only log items after the specified date / time will be loaded.
- pipeline_makerules(specification_dir, pipeline_dir, resource_dir, incremental_loading_override, state_path=None)
- recalculate_source_hashes()
- resource_endpoints(resource)
the list of endpoints a resource was collected from
- resource_organisations(resource)
the list of organisations for which a resource was collected
- resource_path(resource)
- resource_start_date(resource)
the first date a resource was collected
- retire_endpoints_and_sources(collection_df_to_retire) None
This method should be called from commands.py with the correct input.csv format per unique collection.
- Parameters:
collection (Collection) -- The Collection object which gets instantiated by a corresponding commands.py function which calls this method.
collection_df_to_retire (str) -- A small dataframe of sources and endpoints to be retired within a colleciton, this should be generated by a function call within commands.py.
- save_csv(directory=None)
- start_date(entry)
- update()
- class digital_land.collection.EndpointStore(schema=<digital_land.schema.Schema object>)
Bases:
CSVStore
- class digital_land.collection.LogStore(*args, **kwargs)
Bases:
ItemStore
- add_entry(item)
- check_item_path(item, in_path)
- latest_entry_date()
Gets the latest entry date from the log
- load_item(path)
- save_item(item, path)
- class digital_land.collection.ResourceLogStore(*args, **kwargs)
Bases:
CSVStore
- load(log: LogStore, source: CSVStore, directory: str = './collection', after: datetime | None = None)
Rebuild or update resource.csv file from the log store.
We cannot assume that all resources are present on the local file system as we also want to keep records of resources we have not collected within the current collector execution due to their end_date elapsing.
If 'after' is not None, only log entires after the given datetime will be loaded (used when updating).
- class digital_land.collection.SourceStore(schema=<digital_land.schema.Schema object>)
Bases:
CSVStore
- digital_land.collection.isodate(s)
- digital_land.collection.resource_path(resource, directory='./collection')
- digital_land.collection.resource_url(collection, resource)
digital_land.column_fields module
digital_land.command_arguments module
- digital_land.command_arguments.collection_dir(f)
- digital_land.command_arguments.column_field_dir(f)
- digital_land.command_arguments.config_collections_dir(f)
- digital_land.command_arguments.converted_resource_dir(f)
- digital_land.command_arguments.dataset_resource_dir(f)
- digital_land.command_arguments.endpoint_path(f)
- digital_land.command_arguments.input_output_path(f)
- digital_land.command_arguments.issue_dir(f)
- digital_land.command_arguments.operational_issue_dir(f)
- digital_land.command_arguments.organisation_path(f)
- digital_land.command_arguments.output_log_dir(f)
- digital_land.command_arguments.source_path(f)
digital_land.commands module
- digital_land.commands.add_data(csv_file_path, collection_name, collection_dir, pipeline_dir, specification_dir, organisation_path, cache_dir=None)
- digital_land.commands.add_endpoints_and_lookups(csv_file_path, collection_name, collection_dir, pipeline_dir, specification_dir, organisation_path, tmp_dir='./var/cache')
- Parameters:
csv_file_path
collection_name
collection_dir
pipeline_dir
specification_dir
organisation_path
tmp_dir
- Returns:
- digital_land.commands.add_redirections(csv_file_path, pipeline_dir)
- Parameters:
csv_file_path
pipeline_dir
- Returns:
- digital_land.commands.assign_entities(resource_file_paths, collection, dataset, organisation, pipeline_dir, specification_dir, organisation_path, endpoints, tmp_dir='./var/cache')
Assigns entities for the given resources in the given collection. The resources must have sources already added to the collection :param resource_file_paths: :param collection: :param pipeline_dir: :param specification_dir: :param organisation_path: :param tmp_dir: :return:
- digital_land.commands.check_and_assign_entities(resource_file_paths, endpoints, collection_name, dataset, organisation, collection_dir, organisation_path, specification_dir, pipeline_dir, input_path=None)
- digital_land.commands.collect(endpoint_path, collection_dir, pipeline, refill_todays_logs=False)
fetch the sources listed in the endpoint-url column of the ENDPOINT_PATH CSV file
- digital_land.commands.collection_add_source(entry, collection, endpoint_url, collection_dir)
followed by a sequence of optional name and value pairs including the following names: "attribution", "licence", "pipelines", "status", "plugin", "parameters", "start-date", "end-date"
- digital_land.commands.collection_list_resources(collection_dir)
- digital_land.commands.collection_pipeline_makerules(collection_dir, specification_dir, pipeline_dir, resource_dir, incremental_loading_override, state_path=None)
- digital_land.commands.collection_retire_endpoints_and_sources(config_collections_dir, endpoints_sources_to_retire_csv_path)
Retires endpoints and sources based on an input.csv. Please note this requires an input csv with the columns: collection, endpoint and source. :param config_collections_dir: The directory containing the collections. :param endpoints_sources_to_retire_csv_path: The filepath to the csv containing endpoints and sources to retire.
- digital_land.commands.collection_save_csv(collection_dir, refill_todays_logs=False)
- digital_land.commands.convert(input_path, output_path)
- digital_land.commands.dataset_create(input_paths, output_path, organisation_path, pipeline, dataset, specification, issue_dir='issue', column_field_dir='var/column-field', dataset_resource_dir='var/dataset-resource', cache_dir='var/cache', resource_path='collection/resource.csv')
- digital_land.commands.dataset_dump(input_path, output_path)
- digital_land.commands.dataset_dump_flattened(csv_path, flattened_dir, specification, dataset)
- digital_land.commands.dataset_update(input_paths, output_path, organisation_path, pipeline, dataset, specification, issue_dir='issue', column_field_dir='var/column-field', dataset_resource_dir='var/dataset-resource', bucket_name=None, dataset_path=None)
Updates the current state of the sqlite files being held in S3 with new resources dataset_path can be passed in to update a local sqlite file instead of downloading from S3.
- digital_land.commands.default_output_path(command, input_path)
- digital_land.commands.fetch(url, pipeline)
fetch a single source endpoint URL, and add it to the collection
- digital_land.commands.get_resource_unidentified_lookups(input_path: Path, dataset: str, pipeline: Pipeline, specification: Specification, organisations: list = [], tmp_dir: Path | None = None, org_csv_path: Path | None = None, endpoints: list = [])
- digital_land.commands.operational_issue_save_csv(operational_issue_dir, dataset)
- digital_land.commands.organisation_check(**kwargs)
- digital_land.commands.organisation_create(**kwargs)
- digital_land.commands.pipeline_run(dataset, pipeline, specification, input_path, output_path, collection_dir, null_path=None, issue_dir=None, operational_issue_dir='performance/operational_issue/', organisation_path=None, save_harmonised=False, column_field_dir=None, dataset_resource_dir=None, converted_resource_dir=None, cache_dir='var/cache', endpoints=[], organisations=[], entry_date='', config_path='var/cache/config.sqlite3', resource=None, output_log_dir=None, converted_path=None)
- digital_land.commands.process_data_in_batches(entities, flattened_dir, dataset_name)
- digital_land.commands.resource_from_path(path)
- digital_land.commands.save_state(specification_dir, collection_dir, pipeline_dir, resource_dir, incremental_loading_override, output_path)
- digital_land.commands.validate_and_add_data_input(csv_file_path, collection_name, collection_dir, specification_dir, organisation_path)
digital_land.dataset module
digital_land.fetch module
- digital_land.fetch.cache_path(url, filename=None)
- digital_land.fetch.fetch(url, filename=None, path=None)
digital_land.log module
- class digital_land.log.ColumnFieldLog(dataset='', resource='')
Bases:
Log
- add(column, field)
- fieldnames = ['dataset', 'resource', 'column', 'field']
- class digital_land.log.ConvertedResourceLog(dataset='', resource='')
Bases:
Log
- Failed = 'failed'
- Success = 'success'
- add(elapsed, status, exception='')
- fieldnames = ['dataset', 'resource', 'elapsed', 'status', 'exception']
- class digital_land.log.DatasetResourceLog(*args, **kwargs)
Bases:
Log
- add()
- fieldnames = ['dataset', 'resource', 'entry-count', 'line-count', 'mime-type', 'internal-path', 'internal-mime-type']
- save(*args, **kwargs)
- class digital_land.log.IssueLog(dataset='', resource='')
Bases:
Log
- add_severity_column(severity_mapping_path)
- appendErrorMessage(mapping_path)
- apply_entity_map()
- fieldnames = ['dataset', 'resource', 'line-number', 'entry-number', 'field', 'entity', 'issue-type', 'value', 'message']
- log(issue_type, value, message=None)
- log_issue(fieldname, issue_type, value, message=None, line_number=0, entry_number=0, entity=None)
- record_entity_map(entry, entity)
- class digital_land.log.Log(dataset='', resource='')
Bases:
object
- add(*args, **kwargs)
- save(path=None, f=None)
- save_parquet(output_dir)
- class digital_land.log.OperationalIssueLog(dataset='', resource='', operational_issue_dir=None)
Bases:
IssueLog
- get_now()
- load(operational_issue_directory=None)
- load_log_items(operational_issue_directory=None, after=None)
Method to load the operational issue store from operational issue items instead of csvs. used when csvs don't exist or new issue items have been created by running the pipeline. If 'after' is not None, only log items after the specified date / time will be loaded.
- save(output_dir=None, path=None, f=None)
- save_csv(directory=None)
- update()
- digital_land.log.entry_date()
digital_land.makerules module
- class digital_land.makerules.ProcessingOption(value)
Bases:
Enum
An enumeration.
- PROCESS_ALL = 'all'
- PROCESS_NONE = 'none'
- PROCESS_PARTIAL = 'partial'
- digital_land.makerules.dataset_path(dataset)
- digital_land.makerules.get_processing_option(collection, specification_dir, pipeline_dir, resource_dir, incremental_loading_override, state_path)
- digital_land.makerules.pipeline_makerules(collection, specification_dir, pipeline_dir, resource_dir, incremental_loading_override, state_path=None)
- digital_land.makerules.transformed_path(resource, dataset)
digital_land.organisation module
- class digital_land.organisation.Organisation(organisation_path=None, pipeline_dir=None, organisation=None)
Bases:
object
- get(org: str)
provide the organisation and get the organisation row use organisation lookup first to ccheck the curie your using
- get_orgs_by_dataset(dataset)
gets a list of dictionaries where each entry represents an organisation
- load_organisation()
- lookup(organisation)
- organisation = {}
- organisation_lookup = {}
- organisation_path = 'var/cache/organisation.csv'
- organisation_uri = {}
- pipeline_patch_path = 'pipeline/patch.csv'
- digital_land.organisation.lower_uri(value)
- digital_land.organisation.uri_basename(value)
digital_land.register module
- class digital_land.register.Entry(dict=None, /, **kwargs)
Bases:
Item
an ordered item in a register
- class digital_land.register.Item(dict=None, /, **kwargs)
Bases:
UserDict
- hash()
- pack()
- unpack(data)
- class digital_land.register.Record(initlist=None)
Bases:
UserList
an ordered list of entries sharing the same key value
- class digital_land.register.Register(store)
Bases:
object
- digital_land.register.hash_value(data)
digital_land.schema module
- class digital_land.schema.Field(name)
Bases:
object
information about a field
- class digital_land.schema.Schema(name)
Bases:
object
digital_land.specification module
- class digital_land.specification.Specification(path='specification')
Bases:
object
- base_field(field)
- current_fieldnames(schema=None)
- dataset_prefix(dataset)
- factor_fieldnames()
- field_dataset(fieldname)
- field_parent(fieldname)
- field_prefix(field)
- field_type(fieldname)
- field_typology(fieldname)
- get_category_fields(dataset)
- get_dataset_entity_max(dataset)
- get_dataset_entity_min(dataset)
- get_dataset_typology(dataset)
- get_field_datatype_map()
- get_field_prefix_map()
- get_field_typology_map()
- get_odp_collections()
- index_field()
- index_schema()
- intermediate_fieldnames(pipeline)
- load_dataset(path)
- load_dataset_field(path)
- load_dataset_schema(path)
- load_datatype(path)
- load_field(path)
- load_licence(path)
- load_odp_collections(path)
- load_pipeline(path)
- load_schema(path)
- load_schema_field(path)
- load_typology(path)
digital_land.state module
- class digital_land.state.State(data)
Bases:
dict
- build(collection_dir, pipeline_dir, resource_dir, incremental_loading_override)
Build a state object from the current configuration and code
- get_code_hash()
- get_dir_hash(exclude=[])
- load()
Build a state object from a previously saved state file
- save(output_path)
Saves a state object to a file
- digital_land.state.compare_state(specification_dir, collection_dir, pipeline_dir, resource_dir, incremental_loading_override, state_path)
Compares the current state against the one in state_path. Returns a list of different elements, or None if they are the same.
digital_land.update module
- digital_land.update.add_endpoint(entry, endpoint_register)
- digital_land.update.add_source(entry, source_register)
- digital_land.update.add_source_endpoint(entry, directory=None, collection=None)
- digital_land.update.end_date(entry)
- digital_land.update.entry_date(entry)
- digital_land.update.recalculate_source_hashes(collection)
- digital_land.update.start_date(entry)
Module contents
- digital_land.csv_field_size_limit(field_size_limit=9223372036854775807)