Source code for roak_sdk.semantics.assets.borehole

from __future__ import annotations

from collections import defaultdict
from datetime import datetime

from roak_sdk.clients.borehole_client import BoreholeClient
from roak_sdk.clients.base_client import millis_to_datetime
from roak_sdk.semantics.asset import Asset
from roak_sdk.config import DEFAULT_BOREHOLE_FEEDS
from roak_sdk.clients.client_registry import ClientRegistry
    

[docs] class Borehole(Asset): """Represents a Borehole asset.""" DEFAULT_FEEDS = DEFAULT_BOREHOLE_FEEDS DEFAULT_TIMEFRAME_DAYS = 1 # Boreholes often need very recent data def __init__(self, data: dict, registry: ClientRegistry) -> None: """Initialize Borehole from API response data.""" super().__init__(data, registry) self._borehole_client = registry.get(BoreholeClient) # Assuming RigClient has borehole-specific methods
[docs] def get_depth_data(self) -> list[dict]: """Fetch depth data for this borehole. Returns: list[dict]: A list of records, each containing: - timestamp: The time of the measurement (millis since epoch) - datetime: Python datetime object (UTC) derived from timestamp - depth: The depth of the measurement - <feedname>: The value for each feed at that timestamp/depth """ raw_depth_data = self._borehole_client.get_depth_data(self.guid) feed_names = {feed.get("name") for feed in raw_depth_data if feed.get("name")} result = self._pivot_depth_data(raw_depth_data) return self._forward_fill_values(result, feed_names)
def _pivot_depth_data(self, raw_data: list[dict]) -> list[dict]: """Transform feed-centric depth data to timestamp/depth-centric format. Args: raw_data: List of dicts with name, unitName, unitSymbol, and values (where values is a list of {time, depth, value}) Returns: List of dicts with timestamp, datetime, depth, and each feed as <feedname>: <value> """ # Group values by (timestamp, depth) grouped = defaultdict(dict) for feed in raw_data: feed_name = feed.get("name") for entry in feed.get("values", []): key = (entry.get("time"), entry.get("depth")) grouped[key][feed_name] = entry.get("value") # Build the result list result = [] for (timestamp, depth), feed_values in grouped.items(): record = { "timestamp": timestamp, "datetime": millis_to_datetime(timestamp) if timestamp is not None else None, "depth": depth, } record.update(feed_values) result.append(record) # Sort by timestamp, then by depth result.sort(key=lambda x: (x.get("timestamp") or 0, x.get("depth") or 0)) return result def _forward_fill_values(self, records: list[dict], feed_names: set[str]) -> list[dict]: """Forward-fill empty values from previous records. Args: records: List of record dicts to fill. feed_names: Set of feed names to check and fill. Returns: New list of dicts with empty values forward-filled. """ result = [] last_values = {} for record in records: filled_record = record.copy() for feed_name in feed_names: value = filled_record.get(feed_name) if value is None or value == "": # Use previous value if available if feed_name in last_values: filled_record[feed_name] = last_values[feed_name] else: # Update last known value last_values[feed_name] = value result.append(filled_record) return result