Collection
opengate_data.collection
opengate_data.collection.iot_bulk_collection
IotBulkCollectionBuilder Objects
class IotBulkCollectionBuilder()Collection Bulk Builder
add_device_datastream_datapoints
def add_device_datastream_datapoints(
device_id: str,
datastream_id: str,
datapoints: list[tuple[int | float | bool | dict | list | str,
None | datetime | int, None | str, None | str]],
feed: str | None = None) -> "IotBulkCollectionBuilder"Add the datastream identifier and a list of datapoints with their value and at for data collection.
add_datastream_datapoints(“datastream_identifier”, [(value, at, source, source_info)])
Multiple datastreams can be grouped under a single identifier
Arguments:
datastream_idstr - The identifier for the datastream to which the datapoints will be added.datapointslist[tuple[int | float | bool | dict | list, None | datetime | int, None | datetime | int]] - A list of tuples where each tuple represents a datapoint. Each tuple contains the datapoint value and an optional timestamp (‘at’):value- Collected valueat- Number with the time in miliseconds from epoch of the measurement. If this field is None, the platform will assign the server current time to the datapoint whe data is received.
Returns:
IotCollectionBuilder- Returns itself to allow for method chaining.
Example:
builder.add_datastream_datapoints("datastream_identifier_1", [(value1, datetime.now()), (value2, None, "HTTP-Basic", "OK")])
builder.add_datastream_datapoints("datastream_identifier_2", [(value3, None), (value4, 1431602523123)])add_device_datastream_datapoints_with_from
def add_device_datastream_datapoints_with_from(
device_id: str,
datastream_id: str,
datapoints: list[tuple[int | float | bool | dict | list | str,
None | datetime | int, None | datetime | int,
None | str, None | str]],
feed: str | None = None) -> "IotBulkCollectionBuilder"Add the datastream identifier and a list of datapoints with their value, at and from for data collection.
add_datastream_datapoints_with_from(“datastream_identifier”, [(value, at, from, Source, SourceInfo)])
Multiple datastreams can be grouped under a single identifier
Arguments:
datastream_idstr - The identifier for the datastream to which the datapoints will be added.datapointslist[tuple[int | float | bool | dict, None | datetime | int, None | datetime | int]] - A list of tuples where each tuple represents a datapoint. Each tuple contains the datapoint value and an optional timestamp (‘at’) (‘from):value- Collected valueat- Number with the time in miliseconds from epoch of the measurement. If this field is None, the platform will assign the server current time to the datapoint whe data is received.from- Number with the time in miliseconds from epoch of the start period of measurement. This indicates that value is the same within this time interval (from, at).
Returns:
IotCollectionBuilder- Returns itself to allow for method chaining.
Example:
builder.add_datastream_datapoints_with_from("datastream_identifier_1", [(value, 1431602523123, None), (value, None, None, "HTTP-Basic", "OK")])
builder.add_datastream_datapoints_with_from("datastream_identifier_2", [(value, None, datetime.now()), (value, 1431602523123, datetime.now())])from_dataframe
def from_dataframe(df: pd.DataFrame) -> "IotBulkCollectionBuilder"Processes a DataFrame to extract device, data and datapoints, and adds them to the payload.
Arguments:
dfpd.DataFrame - The DataFrame containing the device data and datapoints. The DataFrame is expected to have columns that match the expected structure for device datastreams and datapoints.
Returns:
IotBulkCollectionBuilder- Returns itself to allow for method chaining.
Example:
import pandas as pd
df = pd.DataFrame({
- `'device_id'` - ['device'], ['device2'],
- `'datastream'` - ['1'],['2'],
- `'value'` - [value, value2],
- `'at'` - [datetime.now(), 2000]
})
builder.from_dataframe(df)from_spreadsheet
def from_spreadsheet(
path: str, sheet_name_index: int | str) -> "IotBulkCollectionBuilder"Loads data from a spreadsheet, processes it, and adds the resulting device data and datapoints to the payload. This method is particularly useful for bulk data operations where data is stored in spreadsheet format.
Arguments:
pathstr - The file path to the spreadsheet to load.sheet_name_indexint | str - The sheet name or index to load from the spreadsheet.
Returns:
IotBulkCollectionBuilder- Returns itself to allow for method chaining.
Example:
builder.from_spreadsheet("file.xslx", "sheet_name)
builder.from_spreadsheet("file.xslx", 1)build
def build() -> 'IotBulkCollectionBuilder'Finalizes the construction of the entities search configuration.
This method prepares the builder to execute the collection by ensuring all necessary configurations are set and validates the overall integrity of the build. It should be called before executing the collection to ensure that the configuration is complete and valid.
The build process involves checking that mandatory fields such as the device identifier are set. It also ensures that method calls that are incompatible with each other (like build and build_execute) are not both used.
Returns:
IotBulkCollectionBuilder- Returns itself to allow for method chaining, enabling further actions likeexecute.
Raises:
ValueError- If required configurations are missing or if incompatible methods are used together.
Example:
builder.build()build_execute
def build_execute(include_payload=False)This method is a shortcut that combines building and executing in a single step.
Returns:
dict- A dictionary containing the execution response which includes the status code and potentially other metadata about the execution.
Raises:
ValueError- Ifbuildhas already been called on this builder instance.
Example:
import pandas as pd
from datetime import datetime
data = {
- `"device_id"` - ['entity'],
- `"data_stream_id"` - ["device.temperature.value"],
- `"origin_device_identifier"` - ['entity2'],
- `"value"` - [40],
- `"version"` - ["4.0.0"],
- `"path"` - ["entityTesting3"],
- `"at"` - [datetime.now()],
- `"from"` - [datetime.now()],
}
new_iot_bulk_collection_builder().from_dataframe(df).from_spreadsheet("collect.xslx",0).add_device_datastream_datapoints_with_from("device_identifier", "device.temperature.value", [(300, datetime.now(), datetime.now())])
.add_device_datastream_datapoints("entity", "device.temperature.value", [(300, datetime.now())])
.build_execute()to_dict
def to_dict() -> dictThis method is used to retrieve the entire payload that has been constructed by the builder. The payload includes all devices, their respective datastreams, and the datapoints that have been added to each datastream. This is particularly useful for inspecting the current state of the payload after all configurations and additions have been made, but before any execution actions (like sending data to a server) are taken.
Returns:
dict- A dictionary representing the current state of the payload within the IotBulkCollectionBuilder. This dictionary includes all devices, datastreams, and datapoints that have been configured.
Raises:
Exception- If the build method was not called before this method.
Example:
builder.to_dict()execute
def execute(include_payload=False)Executes the IoT collection based on the current configuration of the builder.
Arguments:
include_payloadbool - Determine if the payload should be included in the response.
Returns:
dict- A dictionary containing the results of the execution, including success messages for each device ID if the data was successfully sent, or error messages detailing what went wrong.
Raises:
Exception- Ifbuild()has not been called beforeexecute(), or if it was not the last method invoked prior toexecute().
Example:
import pandas as pd
from datetime import datetime
data = {
- `"device_id"` - ['entity', entity2],
- `"data_stream_id"` - ["device.temperature.value", "device.name"],
- `"origin_device_identifier"` - ['entity2', None],
- `"value"` - [40, "Name"],
- `"version"` - ["4.0.0", "2.0.0],
- `"path"` - ["entityTesting3", entityTesting4],
- `"at"` - [datetime.now(), datetime.now()],
- `"from"` - [datetime.now(), datetime.now()],
}
builder.new_iot_bulk_collection_builder().from_dataframe(df).from_spreadsheet("collect.xslx",0).add_device_datastream_datapoints_with_from("device_identifier", "device.temperature.value", [(300, datetime.now(), datetime.now())])
.add_device_datastream_datapoints("entity", "device.temperature.value", [(300, datetime.now())])
.build().execute())opengate_data.collection.iot_collection
IotCollectionBuilder Objects
class IotCollectionBuilder()Iot Collection Builder
with_device_identifier
def with_device_identifier(device_identifier: str) -> "IotCollectionBuilder"Add the device identifier to the constructor and validates the type.
Arguments:
device_identifierstr - The unique identifier for the device.
Returns:
IotCollectionBuilder- Returns itself to allow for method chaining.
Example:
builder.with_device_identifier('device_identifier')with_origin_device_identifier
def with_origin_device_identifier(
origin_device_identifier: str) -> "IotCollectionBuilder"Origin Device Identifier in case of be different that the device Identifier that sends information (included in the URI).
Add the origin_device_identifier to the constructor and validates the type.
Arguments:
origin_device_identifierstr - The unique identifier for the device.
Returns:
IotCollectionBuilder- Returns itself to allow for method chaining.
Example:
builder.with_origin_device_identifier('origin_device_identifier')with_version
def with_version(version: str) -> "IotCollectionBuilder"Indicates the version of the structure
Add the version to the constructor and validates the type.
Arguments:
versionstr - The version string to be set.
Returns:
IotCollectionBuilder- Returns itself to allow for method chaining.
Example:
builder.with_version('1.0.0')with_path
def with_path(path: list[str]) -> "IotCollectionBuilder"Identifier of the gateway or gateways that has been used by the asset for sending the information.
This method adds the path gateway to the constructor and validates the type.
Arguments:
pathlist - The list of gateway identifiers.
Returns:
IotCollectionBuilder- Returns itself to allow for method chaining.
Example:
builder.with_path(["path"])with_device
def with_device(device: str) -> "IotCollectionBuilder"Device Identifier in case of be different that the device Identifier that sends information (included in the URI).
Arguments:
devicestr - The unique identifier for the device.
Returns:
IotCollectionBuilder- Returns itself to allow for method chaining.
Example:
builder.with_device("sub-device-001")with_trustedboot
def with_trustedboot(trustedboot: str) -> "IotCollectionBuilder"Indicates that a validation of the Trusted_boot type is required, it is not necessary to enter the value of the field but if you enter it, the entire message received by the platform will compare the value of TrustedBoot with the provisioned value, if they are different the message will not be collected.
Add the trustedboot to the constructor and validates the type.
Arguments:
trustedbootstr - The unique identifier for the device.
Returns:
IotCollectionBuilder- Returns itself to allow for method chaining.
Example:
builder.with_trustedboot("trustedboot")add_datastream_datapoints
def add_datastream_datapoints(
datastream_id: str,
datapoints: list[tuple[int | float | bool | dict | list | str,
None | datetime | int, None | str, None | str]],
feed: str | None = None) -> "IotCollectionBuilder"Add the datastream identifier and a list of datapoints with their value and at for data collection.
add_datastream_datapoints(“datastream_identifier”, [(value, at, source, source_info)])
Multiple datastreams can be grouped under a single identifier
Arguments:
datastream_idstr - The identifier for the datastream to which the datapoints will be added.datapointslist[tuple[int | float | bool | dict | list, None | datetime | int, None | datetime | int]] - A list of tuples where each tuple represents a datapoint. Each tuple contains the datapoint value and an optional timestamp (‘at’):value- Collected valueat- Number with the time in miliseconds from epoch of the measurement. If this field is None, the platform will assign the server current time to the datapoint whe data is received.
Returns:
IotCollectionBuilder- Returns itself to allow for method chaining.
Example:
builder.add_datastream_datapoints("datastream_identifier_1", [(value1, datetime.now()), (value2, None, "HTTP-Basic", "OK")])
builder.add_datastream_datapoints("datastream_identifier_2", [(value3, None), (value4, 1431602523123)])add_datastream_datapoints_with_from
def add_datastream_datapoints_with_from(
datastream_id: str,
datapoints: list[tuple[int | float | bool | dict | list | str,
None | datetime | int, None | datetime | int,
None | str, None | str]],
feed: str | None = None) -> "IotCollectionBuilder"Add the datastream identifier and a list of datapoints with their value, at and from for data collection.
add_datastream_datapoints_with_from(“datastream_identifier”, [(value, at, from, Source, SourceInfo)])
Multiple datastreams can be grouped under a single identifier
Arguments:
datastream_idstr - The identifier for the datastream to which the datapoints will be added.datapointslist[tuple[int | float | bool | dict, None | datetime | int, None | datetime | int]] - A list of tuples where each tuple represents a datapoint. Each tuple contains the datapoint value and an optional timestamp (‘at’) (‘from):value- Collected valueat- Number with the time in miliseconds from epoch of the measurement. If this field is None, the platform will assign the server current time to the datapoint whe data is received.from- Number with the time in miliseconds from epoch of the start period of measurement. This indicates that value is the same within this time interval (from, at).
Returns:
IotCollectionBuilder- Returns itself to allow for method chaining.
Example:
builder.add_datastream_datapoints_with_from("datastream_identifier_1", [(value, 1431602523123, None), (value, None, None, "HTTP-Basic", "OK")])
builder.add_datastream_datapoints_with_from("datastream_identifier_2", [(value, None, datetime.now()), (value, 1431602523123, datetime.now())])from_dict
def from_dict(payload: dict[str, Any]) -> "IotCollectionBuilder"Constructs the collection configuration from a dictionary input.
This method dynamically applies builder methods based on the keys in the input dictionary. It should be used after the build() method has been called to ensure that the builder is in a proper state to accept configuration from a dictionary.
Arguments:
payloaddict[str, Any] - The dictionary containing the configuration Args.
Returns:
IotCollectionBuilder- Returns itself to allow for method chaining.
Raises:
ValueError- If required keys are missing in the payload or if the ‘datastreams’ field is empty.
Example:
builder.build().from_dict({
- `'version'` - '1.0.0',
- `'path'` - ["mews"],
- `'trustedBoot'` - "trustedBoot",
- `'origin_device_identifier'` - 'device123',
- `'datastreams'` - [
- `{'id'` - 'temp', 'datapoints': [(22, 1609459200000)]}
]
})build
def build() -> "IotCollectionBuilder"Finalizes the construction of the IoT collection configuration.
This method prepares the builder to execute the collection by ensuring all necessary configurations are set and validates the overall integrity of the build. It should be called before executing the collection to ensure that the configuration is complete and valid.
The build process involves checking that mandatory fields such as the device identifier are set. It also ensures that method calls that are incompatible with each other (like build and build_execute) are not both used.
Returns:
IotCollectionBuilder- Returns itself to allow for method chaining, enabling further actions likeexecute.
Raises:
ValueError- If required configurations are missing or if incompatible methods are used together.
Notes:
This method should be used as a final step before execute to prepare the IoT collection configuration. It does not modify the state but ensures that the builder’s state is ready for execution.
Example:
builder.build()to_dict
def to_dict() -> dictThis method is used to retrieve the entire payload that has been constructed by the builder. The payload includes all devices, their respective datastreams, and the datapoints that have been added to each datastream. This is particularly useful for inspecting the current state of the payload after all configurations and additions have been made, but before any execution actions (like sending data to a server) are taken.
Returns:
dict- A dictionary representing the current state of the payload within the IotCollectionBuilder. This dictionary includes all devices, datastreams, and datapoints that have been configured.
Raises:
Exception- If the build() method was not called before this method.
Example:
builder.build().to_dict()build_execute
def build_execute(include_payload: bool = False)Executes the IoT collection immediately after building the configuration.
This method is a shortcut that combines building and executing in a single step. It should be used when you want to build and execute the configuration without modifying the builder state in between these operations.
It first validates the build configuration and then executes the collection if the validation is successful.
Arguments:
include_payloadbool - Determine if the payload should be included in the response.
Returns:
dict- A dictionary containing the execution response which includes the status code and potentially other metadata about the execution.
Raises:
ValueError- Ifbuildhas already been called on this builder instance, indicating thatbuild_executeis being incorrectly used afterbuild.Exception- If there are issues during the execution process, including network or API errors.
Example:
new_iot_collection_builder().with_device_identifier("entity").with_version("2.0.0").add_datastream_datapoints("device.temperature.value", [(100, None), (50, datetime.now())]).build_execute(True)execute
def execute(include_payload: bool = False)Executes the IoT collection based on the current configuration of the builder.
Arguments:
include_payloadbool - Determine if the payload should be included in the response.
Returns:
Dict- A dictionary containing the execution response which includes the status code and, optionally, the payload. If an error occurs, a string describing the error is returned.
Raises:
Exception- Ifbuild()has not been called beforeexecute(), or if it was not the last method invoked prior toexecute().
Example:
builder.with_device_identifier("device_identifier").with_version("2.0.0").add_datastream_datapoints("device.temperature.value", [(100, None), (50, datetime.now())]).build().execute(True)opengate_data.collection.iot_pandas_collection
PandasIotCollectionBuilder Objects
class PandasIotCollectionBuilder()Builder class to process a pandas DataFrame into IoT collections and send them to a specified endpoint.
from_dataframe
def from_dataframe(df: pd.DataFrame) -> "PandasIotCollectionBuilder"Set the input DataFrame that contains the IoT data to process.
The DataFrame must include a ‘device_id’ and ‘at’ column. Additional columns are considered as potential datastream values. If ‘at’ is empty or None, a timestamp will be assigned.
Arguments:
dfpd.DataFrame - The DataFrame with ‘device_id’ and ‘at’ columns.
Returns:
PandasIotCollectionBuilder- The current builder instance.
Raises:
Exception- If ‘device_id’ or ‘at’ column is missing in the DataFrame.
with_columns
def with_columns(columns: list[str]) -> "PandasIotCollectionBuilder"Specify the columns from the DataFrame that should be included as datastreams in the IoT payload.
If this method is not called, all available datastream columns (except required/optional ones) are used. If it is called, only the specified columns will be considered.
Arguments:
columnslist[str] - The list of column names to include as datastreams.
Returns:
PandasIotCollectionBuilder- The current builder instance.
Raises:
Exception- If any specified column does not exist in the DataFrame.
with_max_bytes_per_request
def with_max_bytes_per_request(max_bytes: int) -> "PandasIotCollectionBuilder"Set the maximum number of bytes per request for IoT collection.
This controls how the payload is batched when sending to the endpoint.
Arguments:
max_bytesint - The maximum request size in bytes.
Returns:
PandasIotCollectionBuilder- The current builder instance.
build
def build() -> "PandasIotCollectionBuilder"Build the request payload after the DataFrame and columns have been configured.
This method processes the DataFrame, converting columns into the appropriate IoT datastream format. It must be called before execute() if not using build_execute().
Returns:
PandasIotCollectionBuilder- The current builder instance.
Raises:
Exception- If build() and build_execute() are used together.
build_execute
def build_execute(
include_payload: bool = False) -> dict[str, list[dict[str, Any]]]Build the payload and execute the request in a single step.
This method is a shortcut for users who want to build and then immediately execute. It cannot be used together with build() or execute().
Arguments:
include_payloadbool - Whether to include the payload in the result. Defaults to False.
Returns:
dict[str, list[dict[str, Any]]]: The results of the IoT collection request.
Raises:
ValueError- If build_execute() is used together with build() or execute().
execute
def execute(include_payload: bool = False) -> Union[str, pd.DataFrame]Execute the request after building it with build() or build_execute().
If include_payload is True, it returns a JSON string with the payload and results. Otherwise, it returns a pandas DataFrame with a status column summarizing the results.
Arguments:
include_payloadbool - Whether to include the payload in the result. Defaults to False.
Returns:
Union[str, pd.DataFrame]: The result of the IoT collection request.
Raises:
Exception- If the required method invocation order is not respected.