Source code for clappform

from __future__ import annotations

from itertools import chain
from typing import TYPE_CHECKING

import grpc

from .proto.clappform.client.v1 import (
    actionflow_pb2_grpc,
    collection_pb2_grpc,
    query_pb2_grpc,
)
from .proto.clappform.data.v1 import (
    aggregate_pb2_grpc,
    delete_pb2_grpc,
    insert_pb2_grpc,
    update_pb2_grpc,
)
from .utils import default_options

if TYPE_CHECKING:
    from typing import Callable, Iterator, Optional

    from .proto.clappform.client.v1 import (
        actionflow_pb2,
        collection_pb2,
        query_pb2,
    )
    from .proto.clappform.data.v1 import (
        aggregate_pb2,
        delete_pb2,
        insert_pb2,
        update_pb2,
    )
    from .proto.clappform.v1 import commons_pb2
    from .typedefs import GrpcChannelOptions, GrpcMetadata, RpcCallOptions

# Metadata
__version__ = "5.0.0-alpha10"
__author__ = "Clappform B.V."
__email__ = "info@clappform.com"
__license__ = "MIT"
__doc__ = "Clappform Python API wrapper"


# lol = actionflow_pb2_grpc.ActionflowManagementStub


[docs] class GrpcBase: """ Base class for gRPC connections. :param target: The target server address. :type target: str :param token: The authentication token. :type token: str :param location: The client location (subdomain). :type location: str :param credentials_fn: A callable that returns gRPC channel credentials, defaults to :py:obj:`grpc.ssl_channel_credentials`. :type credentials_fn: Optional[Callable[[], grpc.ChannelCredentials]], optional :param options_fn: A callable that returns gRPC channel options, defaults to :func:`clappform.utils.default_options`. :type options_fn: Optional[Callable[ [], :py:data:`~clappform.typedefs.GrpcChannelOptions`] ], optional :param compression_fn: A callable that returns gRPC compression options, defaults to None. :type compression_fn: Optional[Callable[ [], grpc.Compression ]], optional :ivar channel: The gRPC channel. :vartype channel: grpc.Channel :ivar metadata: The metadata for gRPC calls. :vartype metadata: :py:data:`~clappform.typedefs.GrpcMetadata` """ def __init__( self, target: str, token: str, location: str, credentials_fn: Optional[ Callable[[], grpc.ChannelCredentials] ] = grpc.ssl_channel_credentials, options_fn: Optional[ Callable[[], GrpcChannelOptions] ] = default_options, compression_fn: Optional[Callable[[], grpc.Compression]] = None, ) -> None: """ Initializes the GrpcBase class. :param target: The target server address. :type target: str :param token: The authentication token. :type token: str :param location: The client location (subdomain). :type location: str :param credentials_fn: A callable that returns gRPC channel credentials, defaults to :py:obj:`grpc.ssl_channel_credentials`. :type credentials_fn: Optional[Callable[ [], grpc.ChannelCredentials ]], optional :param options_fn: A callable that returns gRPC channel options, defaults to :func:`clappform.utils.default_options`. :type options_fn: Optional[Callable[ [], :py:data:`~clappform.typedefs.GrpcChannelOptions` ]], optional :param compression_fn: A callable that returns gRPC compression options, defaults to None. :type compression_fn: Optional[Callable[ [], grpc.Compression ]], optional """ credentials = credentials_fn and credentials_fn() options = options_fn and options_fn() compression = compression_fn and compression_fn() if credentials is None: # We assume we want an insecure connection. self.channel = grpc.insecure_channel(target, options, compression) else: self.channel = grpc.secure_channel( target, credentials, options, compression ) self.metadata: GrpcMetadata = ( ("location", location), ("x-api-key", token), ("deadline", "3600s"), ) def _metadata(self, metadata: GrpcMetadata) -> GrpcMetadata: """ Combines the class metadata with additional metadata. :param metadata: Additional metadata to be included. :return: Combined metadata. :rtype: :py:data:`~clappform.typedefs.GrpcMetadata` """ return tuple(chain(self.metadata, metadata)) def _default_kwargs( self, timeout: Optional[float] = None, metadata: Optional[GrpcMetadata] = None, credentials: Optional[grpc.CallCredentials] = None, wait_for_ready: Optional[bool] = None, compression: Optional[grpc.Compression] = None, ) -> RpcCallOptions: """ Creates the default keyword arguments for gRPC calls. :param timeout: Timeout for the gRPC call. :param metadata: Additional metadata for the gRPC call. :param credentials: Call credentials for the gRPC call. :param wait_for_ready: Whether to wait for the channel to be ready. :param compression: Compression options for the gRPC call. :return: Dictionary of gRPC call options. :rtype: :py:data:`~clappform.typedefs.RpcCallOptions` """ return { "timeout": timeout, "metadata": self._metadata(metadata or ()), "credentials": credentials, "wait_for_ready": wait_for_ready, "compression": compression, }
[docs] class Data(GrpcBase): """ Data class for interacting with gRPC services. Inherits from :class:`~clappform.GrpcBase`. :param token: The authentication token. :type token: str :param location: The client location (subdomain). :type location: str :param target: The target server address, defaults to "data.clappform.com:50051". :type target: str, optional :param credentials_fn: A callable that returns gRPC channel credentials, defaults to :py:obj:`grpc.ssl_channel_credentials`. :type credentials_fn: Optional[Callable[ [], grpc.ChannelCredentials ]], optional :param options_fn: A callable that returns gRPC channel options, defaults to :func:`clappform.utils.default_options`. :type options_fn: Optional[Callable[ [], :py:data:`~clappform.typedefs.GrpcChannelOptions` ]], optional :param compression_fn: A callable that returns gRPC compression options, defaults to None. :type compression_fn: Optional[Callable[ [], grpc.Compression ]], optional :ivar aggregate_stub: Stub for aggregate management. :vartype aggregate_stub: aggregate_pb2_grpc.AggregateManagementStub :ivar insert_stub: Stub for insert management. :vartype insert_stub: insert_pb2_grpc.InsertManagementStub :ivar delete_stub: Stub for delete management. :vartype delete_stub: delete_pb2_grpc.DeleteManagementStub :ivar update_stub: Stub for update management. :vartype update_stub: update_pb2_grpc.UpdateManagementStub """ def __init__( self, token: str, location: str, target: str = "data.clappform.com:50051", credentials_fn: Optional[ Callable[[], grpc.ChannelCredentials] ] = grpc.ssl_channel_credentials, options_fn: Optional[ Callable[[], GrpcChannelOptions] ] = default_options, compression_fn: Optional[Callable[[], grpc.Compression]] = None, ) -> None: """ Initializes the Data class. :param token: The authentication token. :type token: str :param location: The client location (subdomain). :type location: str :param target: The target server address, defaults to "data.clappform.com:50051". :type target: str, optional :param credentials_fn: A callable that returns gRPC channel credentials, defaults to :py:obj:`grpc.ssl_channel_credentials`. :type credentials_fn: Optional[Callable[ [], grpc.ChannelCredentials ]], optional :param options_fn: A callable that returns gRPC channel options, defaults to :func:`clappform.utils.default_options`. :type options_fn: Optional[Callable[ [], :py:data:`~clappform.typedefs.GrpcChannelOptions` ]], optional :param compression_fn: A callable that returns gRPC compression options, defaults to None. :type compression_fn: Optional[Callable[ [], grpc.Compression ]], optional """ super().__init__( target, token, location, credentials_fn=credentials_fn, options_fn=options_fn, compression_fn=compression_fn, ) self.aggregate_stub = aggregate_pb2_grpc.AggregateManagementStub( self.channel ) self.insert_stub = insert_pb2_grpc.InsertManagementStub(self.channel) self.delete_stub = delete_pb2_grpc.DeleteManagementStub(self.channel) self.update_stub = update_pb2_grpc.UpdateManagementStub(self.channel)
[docs] def aggregate( self, request: aggregate_pb2.AggregateStreamRequest, **kwargs: RpcCallOptions, ) -> Iterator[aggregate_pb2.AggregateResponse]: """ Sends an aggregate request to the gRPC service and yields responses. :param request: The aggregate request to be sent. :type request: :class:`~clappform.proto.clappform.data.v1.\ aggregate_pb2.AggregateStreamRequest` :return: An iterator over aggregate responses. :rtype: Iterator[:class:`~clappform.proto.clappform.data.v1.\ aggregate_pb2.AggregateResponse`] :raises grpc.RpcError: If an RPC error occurs This method sends an aggregate request to the gRPC service via the `AggregateStream` method of the `AggregateManagementStub`. The responses from the service are yielded one by one. Example usage: .. code-block:: python from clappform.proto.clappform.data.v1 import aggregate_pb2 # Create an instance of the Data class data_instance = Data(token="your_token", location="your_location") # Create an AggregateStreamRequest request = aggregate_pb2.AggregateStreamRequest( # Fill in the request details ) # Iterate over the responses for response in data_instance.aggregate(request): print(response) .. note:: Ensure that the `clappform.proto.clappform.data.v1.aggregate_pb2` module is imported and available in your code to use the `AggregateStreamRequest` and `AggregateResponse` classes. """ yield from self.aggregate_stub.AggregateStream( request, **self._default_kwargs(**kwargs) )
[docs] def insert_many( self, request_iterator: Iterator[insert_pb2.InsertRequest] ) -> Iterator[insert_pb2.InsertResponse]: """ Sends multiple insert requests to the gRPC service and yields responses. :param request_iterator: An iterator over insert requests. :type request_iterator: Iterator[:class:`~clappform.proto.clappform.\ data.v1.insert_pb2.InsertRequest`] :return: An iterator over insert responses. :rtype: Iterator[:class:`~clappform.proto.clappform.data.v1.\ insert_pb2.InsertResponse`] :raises grpc.RpcError: If an RPC error occurs This method sends multiple insert requests to the gRPC service via the `InsertMany` method of the `InsertManagementStub`. The responses from the service are yielded one by one. Example usage: .. code-block:: python import clappform from clappform.proto.clappform.data.v1 import insert_pb2 # Create an instance of the Data class d = clappform.Data(token="your_token", location="your_location") # Create an iterator of InsertRequest request_iterator = iter([ insert_pb2.InsertRequest( # Fill in the request details ), # Add more requests as needed ]) # Iterate over the responses for response in d.insert_many(request_iterator): print(response) .. note:: Ensure that the `clappform.proto.clappform.data.v1.insert_pb2` module is imported and available in your code to use the `InsertRequest` and `InsertResponse` classes. """ yield from self.insert_stub.InsertMany( request_iterator, **self._default_kwargs(), )
[docs] def delete_many_by_oids( self, request: delete_pb2.DeleteRequestOids ) -> commons_pb2.Message: """ Sends a delete request to the gRPC service to delete multiple items by OIDs. :param request: The delete request containing OIDs. :type request: :class:`~clappform.proto.clappform.data.v1.delete_pb2.\ DeleteRequestOids` :return: A message indicating the result of the delete operation. :rtype: :class:`~clappform.proto.clappform.data.v1.commons_pb2.Message` :raises grpc.RpcError: If an RPC error occurs This method sends a delete request to the gRPC service via the `DeleteManyByOids` method of the `DeleteManagementStub`. The response from the service indicates the result of the operation. Example usage: .. code-block:: python import clappform from clappform.proto.clappform.data.v1 import delete_pb2 # Create an instance of the Data class d = clappform.Data(token="your_token", location="your_location") # Create a DeleteRequestOids request = delete_pb2.DeleteRequestOids( # Fill in the request details ) # Send the delete request and get the response response = d.delete_many_by_oids(request) print(response) .. note:: Ensure that the `clappform.proto.clappform.data.v1.delete_pb2` and `clappform.proto.clappform.data.v1.commons_pb2` modules are imported and available in your code to use the `DeleteRequestOids` and `Message` classes. """ return self.delete_stub.DeleteManyByOids( request, **self._default_kwargs() )
[docs] def update_replace_many( self, request: update_pb2.UpdateRequestByOid ) -> update_pb2.UpdateRequest: """ Sends an update request to the gRPC service to replace multiple items. :param request: The update request. :type request: :class:`~clappform.proto.clappform.data.v1.update_pb2.\ UpdateRequestByOid` :return: The update request response. :rtype: :class:`~clappform.proto.clappform.data.v1.update_pb2.\ UpdateRequest` :raises grpc.RpcError: If an RPC error occurs This method sends an update request to the gRPC service via the `ReplaceMany` method of the `UpdateManagementStub`. The response from the service contains the updated request. Example usage: .. code-block:: python import clappform from clappform.proto.clappform.data.v1 import update_pb2 # Create an instance of the Data class d = clappform.Data(token="your_token", location="your_location") # Create an UpdateRequestByOid request = update_pb2.UpdateRequestByOid( # Fill in the request details ) # Send the update request and get the response response = d.update_replace_many(request) print(response) .. note:: Ensure that the `clappform.proto.clappform.data.v1.update_pb2` module is imported and available in your code to use the `UpdateRequestByOid` and `UpdateRequest` classes. """ return self.update_stub.ReplaceMany(request, **self._default_kwargs())
[docs] class Client(GrpcBase): """ Client class for interfacing with various gRPC services provided by Clappform. Inherits from :class:`~clappform.GrpcBase`. :param token: The authentication token. :type token: str :param location: The client location (subdomain). :type location: str :param target: The target server address, defaults to "data.clappform.com:50051". :type target: str, optional :param credentials_fn: A callable that returns gRPC channel credentials, defaults to :py:obj:`grpc.ssl_channel_credentials`. :type credentials_fn: Optional[Callable[ [], grpc.ChannelCredentials ]], optional :param options_fn: A callable that returns gRPC channel options, defaults to :func:`clappform.utils.default_options`. :type options_fn: Optional[Callable[ [], :py:data:`~clappform.typedefs.GrpcChannelOptions` ]], optional :param compression_fn: A callable that returns gRPC compression options, defaults to None. :type compression_fn: Optional[Callable[ [], grpc.Compression ]], optional :ivar actionflow_stub: Stub for actionflow management. :vartype actionflow_stub: actionflow_pb2_grpc.ActionflowManagementStub :ivar collection_stub: Stub for collection management. :vartype collection_stub: collection_pb2_grpc.CollectionManagementStub :ivar query_stub: Stub for query management. :vartype query_stub: query_pb2_grpc.QueryManagementStub """ def __init__( self, token: str, location: str, target: str = "data.clappform.com:50051", credentials_fn: Optional[ Callable[[], grpc.ChannelCredentials] ] = grpc.ssl_channel_credentials, options_fn: Optional[ Callable[[], GrpcChannelOptions] ] = default_options, compression_fn: Optional[Callable[[], grpc.Compression]] = None, ) -> None: """ Initializes the Client class. :param token: The authentication token. :type token: str :param location: The client location (subdomain). :type location: str :param target: The target server address, defaults to "data.clappform.com:50051". :type target: str, optional :param credentials_fn: A callable that returns gRPC channel credentials, defaults to :py:obj:`grpc.ssl_channel_credentials`. :type credentials_fn: Optional[Callable[ [], grpc.ChannelCredentials ]], optional :param options_fn: A callable that returns gRPC channel options, defaults to :func:`clappform.utils.default_options`. :type options_fn: Optional[Callable[ [], :py:data:`~clappform.typedefs.GrpcChannelOptions` ]], optional :param compression_fn: A callable that returns gRPC compression options, defaults to None. :type compression_fn: Optional[Callable[ [], grpc.Compression ]], optional """ super().__init__( target, token, location, credentials_fn=credentials_fn, options_fn=options_fn, compression_fn=compression_fn, ) self.actionflow_stub = actionflow_pb2_grpc.ActionflowManagementStub( self.channel ) self.collection_stub = collection_pb2_grpc.CollectionManagementStub( self.channel ) self.query_stub = query_pb2_grpc.QueryManagementStub(self.channel)
[docs] def actionflow_start( self, request: actionflow_pb2.StartActionflow ) -> actionflow_pb2.StartActionflowResponse: """ Starts an actionflow using the gRPC service. :param request: The start actionflow request. :type request: :class:`~clappform.proto.clappform.client.v1.\ actionflow_pb2.StartActionflow` :return: The response for starting an actionflow. :rtype: :class:`~clappform.proto.clappform.client.v1.actionflow_pb2.\ StartActionflowResponse` :raises grpc.RpcError: If an RPC error occurs Example: .. code-block:: python import clappform from clappform.proto.clappform.data.v1 import actionflow_pb2 # Create an instance of the Data class c = clappform.Client(token="your-token", location="your-locati\ on") # Create an StartActionflow request = actionflow_pb2.StartActionflow(...) # Send the start request and get the response response = c.actionflow_start(request) print(response) .. note:: Ensure that the `clappform.proto.clappform.client.v1.actionflow_pb2` module is imported and available in your code to use the `StartActionflow` and `StartActionflowResponse` classes. """ return self.actionflow_stub.Start(request, **self._default_kwargs())
[docs] def collection_get( self, request: commons_pb2.Read ) -> collection_pb2.Collection: """ Gets a collection using the gRPC service. :param request: The read request for the collection. :type request: :class:`~clappform.proto.clappform.v1.commons_pb2.Read` :return: The collection response. :rtype: :class:`~clappform.proto.clappform.client.v1.collection_pb2.\ Collection` :raises grpc.RpcError: If an RPC error occurs Example: .. code-block:: python import clappform from clappform.proto.clappform.v1 import commons_pb2 # Create an instance of the Data class c = clappform.Client(token="your-token", location="your-locati\ on") # Create a Read request = commons_pb2.Read(...) # Send the read request and get the response response = c.collection_get(request) print(response) .. note:: Ensure that the `clappform.proto.clappform.client.v1.collection_pb2` and `clappform.proto.clappform.v1.commons_pb2` module is imported and available in your code to use the `Read` and `Collection` classes. """ return self.collection_stub.Get(request, **self._default_kwargs())
[docs] def query_get( self, request: commons_pb2.Read, ) -> query_pb2.Query: """ Gets a query using the gRPC service. :param request: The read request for the query. :type request: :class:`~clappform.proto.clappform.v1.commons_pb2.Read` :return: The query response. :rtype: :class:`~clappform.proto.clappform.client.v1.query_pb2.Query` :raises grpc.RpcError: If an RPC error occurs Example: .. code-block:: python import clappform from clappform.proto.clappform.v1 import commons_pb2 # Create an instance of the Data class c = clappform.Client(token="your-token", location="your-locati\ on") # Create a Read request = commons_pb2.Read(...) # Send the read request and get the response response = c.query_get(request) print(response) :note: Ensure that the `clappform.proto.clappform.client.v1.query_pb2` and `clappform.proto.clappform.v1.commons_pb2` module is imported and available in your code to use the `Read` and `Query` classes. """ return self.query_stub.Get(request, **self._default_kwargs())