Source code for clappform.utils

"""
This module defines type aliases and a data structure used for configuring gRPC
RPC call options.
"""

from __future__ import annotations

import json
import tempfile
from typing import TYPE_CHECKING

from .proto.clappform.data.v1 import insert_pb2

if TYPE_CHECKING:
    from collections.abc import Iterator

    from pandas import DataFrame

    from .typedefs import GrpcChannelOptions


[docs] def default_options( max_attemps: int = 5, initial_backoff: str = "0.1s", max_backoff: str = "1s", backoff_multiplier: int = 2, retryable_status_codes: list[str] | None = None, ) -> GrpcChannelOptions: """ Generates default gRPC channel options with retry configuration. This function creates a list of gRPC channel options that include retry policies and service configuration. It constructs a service configuration JSON for gRPC retry policies based on the provided parameters. :param max_attemps: The maximum number of retry attempts for a failed RPC call. Default is 5. :type max_attemps: int :param initial_backoff: The initial backoff duration between retry attempts, specified as a string with units (e.g., "0.1s"). Default is "0.1s". :type initial_backoff: str :param max_backoff: The maximum backoff duration between retry attempts, specified as a string with units (e.g., "1s"). Default is "1s". :type max_backoff: str :param backoff_multiplier: The multiplier applied to the backoff duration for each retry attempt. Default is 2. :type backoff_multiplier: int :param retryable_status_codes: A list of gRPC status codes that are considered retryable. If not provided, defaults to ["UNAVAILABLE"]. :type retryable_status_codes: Optional[list[str]] :return: A list of gRPC channel options as tuples, where each tuple consists of an option name and its value. :rtype: GrpcChannelOptions :example: >>> options = default_options() >>> print(options) [("grpc.enable_retries", 1), ("grpc.service_config", '{"methodConfig": [{"\ name": [{}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "0.1s", "\ maxBackoff": "1s", "backoffMultiplier": 2, "retryableStatusCodes": ["\ UNAVAILABLE"]}}]}')] This function is used to configure gRPC channel options with retry policies for better resilience in network operations. """ if retryable_status_codes is None: retryable_status_codes = ["UNAVAILABLE"] service_config_json = json.dumps( { "methodConfig": [ { "name": [{}], "retryPolicy": { "maxAttempts": max_attemps, "initialBackoff": initial_backoff, "maxBackoff": max_backoff, "backoffMultiplier": backoff_multiplier, "retryableStatusCodes": retryable_status_codes, }, } ] } ) options: list[tuple[str, int | str]] = [ ("grpc.keepalive_time_ms", 120000), ("grpc.keepalive_timeout_ms", 20000), ("grpc.keepalive_permit_without_calls", True), ("grpc.http2.max_pings_without_data", 0), ("grpc.http2.min_time_between_pings_ms", 120000), ("grpc.http2.min_ping_interval_without_data_ms", 120000), ("grpc.enable_retries", 1), ("grpc.service_config", service_config_json), ] return options
[docs] def insert_many_dataframe( collection: str, df: DataFrame, size: int = 2500, encoding: str = "utf-8", ) -> Iterator[insert_pb2.InsertRequest]: """ Yields InsertRequest objects for chunks of a DataFrame. This function splits a pandas DataFrame into smaller chunks and yields `InsertRequest` objects containing JSON-encoded data from each chunk. :param collection: The name of the collection where data will be inserted. :type collection: str :param df: The DataFrame to be split into chunks and inserted. :type df: pandas.DataFrame :param size: The size of each chunk. Defaults to 2500. :type size: int, optional :param encoding: The encoding to be used for JSON data. Defaults to "utf-8". :type encoding: str, optional :return: An iterator over `InsertRequest` objects containing the JSON-encoded data. :rtype: Iterator[:class:`~clappform.proto.clappform.data.v1.insert_pb2.\ InsertRequest`] :raises ValueError: If the DataFrame is empty. """ if df.empty: raise ValueError("The DataFrame is empty") for chunk in [df[i : i + size] for i in range(0, df.shape[0], size)]: # `TemporaryFile` And `force_ascii=False` force the chunck to be # `UTF-8` encoded. with tempfile.TemporaryFile(mode="w+", encoding=encoding) as fd: chunk.to_json(fd, orient="records", force_ascii=False) fd.seek(0) # Reset pointer to begin of file for reading. yield insert_pb2.InsertRequest( data=fd.read().encode(encoding=encoding), collection=collection, )