Source code for sic_framework.core.utils

"""
utils.py

This module contains utility functions for the SIC framework.
"""

import binascii
import getpass
import os
import socket
import sys
import shutil
import hashlib
import base64
import six

PYTHON_VERSION_IS_2 = sys.version_info[0] < 3

MAGIC_STARTED_COMPONENT_MANAGER_TEXT = "Started component manager"


[docs] def get_ip_adress(): """ Get the IP address of the device. :return: The IP address of the device. :rtype: str """ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.settimeout(0) try: # doesn't even have to be reachable s.connect(("10.254.254.254", 1)) IP = s.getsockname()[0] except Exception: IP = "127.0.0.1" finally: s.close() return IP
[docs] def ping_server(server, port, timeout=3): """ Ping a server to check if it is reachable. :param server: The server to ping. :type server: str :param port: The port to ping. :type port: int :param timeout: The timeout in seconds. :type timeout: float :return: True if the server is reachable, False otherwise. :rtype: bool """ try: # print("attempting to connect to device at server: {} and port: {}".format(server, port)) socket.setdefaulttimeout(timeout) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((server, port)) except OSError as error: print("OSError when trying to connect to device: {}".format(error)) return False except Exception as e: print("Encountered exception while trying to connect to device: {}".format(e)) else: s.close() return True
[docs] def get_username_hostname_ip(): """ Get the username, hostname and IP address of the device. :return: The username, hostname and IP address of the device. :rtype: str """ return getpass.getuser() + "_" + socket.gethostname() + "_" + get_ip_adress()
[docs] def ensure_binary(s, encoding="utf-8", errors="strict"): """ From a future six version. Coerce **s** to six.binary_type. For Python 2: - `unicode` -> encoded to `str` - `str` -> `str` For Python 3: - `str` -> encoded to `bytes` - `bytes` -> `bytes` :param s: The string to convert. :type s: str :param encoding: The encoding to use. :type encoding: str :param errors: The error handling strategy. :type errors: str :return: The converted string. :rtype: str """ if isinstance(s, six.binary_type): return s if isinstance(s, six.text_type): return s.encode(encoding, errors) raise TypeError("not expecting type '%s'" % type(s))
[docs] def str_if_bytes(data): """ Compatibility for the channel names between python2 and python3 a redis channel b'name' differes from "name" :param data: The data to convert. :type data: str or bytes :return: The converted string. :rtype: str """ if isinstance(data, bytes): return data.decode("utf-8", errors="replace") return data
[docs] def random_hex(nbytes=8): """ Generate a random hex string. :param nbytes: The number of bytes to generate. :type nbytes: int :return: The random hex string. :rtype: str """ return binascii.b2a_hex(os.urandom(nbytes))
[docs] def is_sic_instance(obj, cls): """ Return True if the object argument is an instance of the classinfo argument, or of a (direct, indirect, or virtual) subclass thereof. isinstance does not work when pickling object, so a looser class name check is performed. https://stackoverflow.com/questions/620844/why-do-i-get-unexpected-behavior-in-python-isinstance-after-pickling :param obj: :param cls: :return: """ parents = obj.__class__.__mro__ for parent in parents: if parent.__name__ == cls.__name__: return True return False
[docs] def type_equal_sic(a, b): """ Check if two objects are of the same type. type(a) == type(b), but with support for objects transported across the network with pickle. :param a: The first object. :type a: object :param b: The second object. :type b: object :return: """ return type(a).__name__ == type(b).__name__
[docs] def zip_directory(path): """ Create a compressed zip file from the given directory path. :param path: The path to the directory to be zipped. :type path: str :return: The path to the created zip file. :rtype: str :raises: FileNotFoundError: If the path doesn't exist """ # check if the path exists if not os.path.exists(path): raise FileNotFoundError("Path {path} does not exist".format(path=path)) # Get the directory and name directory = os.path.dirname(path) name = os.path.basename(path) # Create base name for the zip (without .zip extension) base_name = os.path.join(directory, name) try: # make_archive automatically adds .zip extension zip_filepath = shutil.make_archive(base_name, 'zip', directory, name) return zip_filepath except Exception as e: raise IOError("Error while zipping: {}".format(str(e)))
[docs] def create_data_stream_id(component_id, input_source, length=16): """ Hashes component info into a short, random-looking string. Args: component_id (str): Component identifier. input_source (str): Input stream name. length (int): Length of the resulting string (default 16). Returns: str: A base64-encoded truncated hash string. """ # print("Creating data stream id for \ncomponent_name: {}\ncomponent_ip: {}\ninput_source: {}".format(component_name, component_ip, input_source)) try: combined = "{component_id}|{input_source}".format(component_id=component_id, input_source=input_source) sha256_hash = hashlib.sha256(combined.encode('utf-8')).digest() encoded = base64.urlsafe_b64encode(sha256_hash).decode('utf-8') # print("Data stream id created: {}".format(encoded)) except Exception as e: # print("Error creating data stream id: {}".format(e)) raise e return encoded[:length]
if __name__ == "__main__": pass