Welcome to the next generation 5G and post 5G computing

softgrid is a resource orchestration implementation to executing any type of computation on a grid of connected commodity hardware. It’s the perfect hybrid solution to distributing data and computation tasks that allows maximizing data availability and transfer speed without affecting the scalability of the performed calculation. In a world evolving towards extremely low latency and fast data transfer rate network infrastructures (e.g. 5G and post 5G) and abundance of computation resources availability, applications should be enabled to take advantage of such infrastructure and tailor the required performance. softgrid is a very powerful and light weight implementation that allows creating any soft grid structure of executors on any connected grid of computation hardware (local and remote) without the need to configure or install any piece of software on local or remote machines. softgrid is very flexible to the point where any computation graph can be created in short and razor sharp softgrid api calls.

Implementations like softgrid, are the only hope to enable third party entities to leverage and use complex networks (5G and post 5G) with minimal implementation that can be developed on local and personal machines and then indefinitely expanded to the global network without touching a line of code or having to install any piece of software on remote network machines.

softgrid also allows performing embarrassingly parallel independent computations as well as synchronous and asynchronous type of computations on the grid workers. No need to mention that traditional map-reduce jobs are also possible and performed at a breeze

softgrid is not a rigid software infrastructure even at runtime. A user can scale up and down and change the structure of the grid by lunching or disconnecting and shutting down executors or machines.

Finally, softgrid can be used without any effort on any cloud infrastructure and leverage the scalability of the resources real time.

Nomenclature:

  1. Orchestrator: This is the main executor of the grid and it has to be launched to create the grid. ORCHESTRATOR PROCESS IS NOT SUPPOSED TO RUN HEAVY COMPUTATIONS BUT SHOULD BE USED FOR GRID TRANSACTION PURPOSES

  2. Router: This is a simple routing executor that is used to split and bifurcate the grid and create multiple grid branches and nodes. ROUTER PROCESS IS NOT SUPPOSED TO RUN HEAVY COMPUTATIONS BUT SHOULD BE USED FOR GRID TRANSACTION PURPOSESS

  3. Manager: This is the executor that manages workers. A manager and its workers are supposed to be running on the same physcial machine. MANAGER PROCESS IS NOT SUPPOSED TO RUN HEAVY COMPUTATIONS BUT SHOULD BE USED FOR GRID TRANSACTION PURPOSES

  4. Worker: This is the worker of the grid where heavy computations should be made.

  5. gridUniqueName: Every executor when launched and connected to the grid will get attributed its own and unique grid name which can be accessed using gridUniqueName property of the executor instance. A gridUniqueName is afterall a formatted string of the following format: “role|name@address[pid]:port” where role is the executor type (orchestrator, router, manager, etc.), name is the executor name given by the user upon instanciation, address is the machine ip address, pid is the process id on the machine and port is the port of communication between the executor and its upgrid. When allowed, an executor can transfer data to a target executor on the grid using the target gridUniqueName.

  6. upgrid & downgrid: An executor (let’s call it A) is connected to the grid through another grid executor (let’s call it B). In this case B is the upgrid of A because it connects A to the grid and it’s one executor or one transfer step closer to the orchestrator than A. Every connected executor has a unique upgrid except the Orchestrator that has no upgrid because it’s the main entry to the grid. Keeping the same logic, A is a downgrid of B because because it’s one executor or one transfer step away from the orchestrator than B. Except for workers that are the end point of the grid and have no downgrids, executors can have more than one downgrid but always one upgrid.

  7. bridge: A bridge is a special direct connection between grid distant executors.

  8. gridPath: Every executor has it’s own gridPath that is a list of all directly connected executors gridUniqueName from the orchestrator to the executor itself.

Usage:

  1. On local machine to scale up computations

  2. On cloud services to leverage hexa-scale computation

  3. By IOT devices connected to fast network (5G and post 5G) to get fast response upon tasks that require heavy computations or on demand distributed data availability

_images/grid_usage.png

A softgrid grid instance is hardware independant and realtime size adjustable. Using softgrid, connected IOT devices can communicate with the network towers to perform swift low latency computations while getting and pushing updates and computations to cloud services, at home personal devices or to a business data center. For heaving computation demanding tasks, grid nodes and connection bridges can be created and later on decommissioned and destroyed as needed.


softgrid.get_version()

Get softgrid’s version number.

softgrid.get_author()

Get softgrid’s author’s name.

softgrid.get_email()

Get softgrid’s author’s email.

softgrid.get_doc()

Get softgrid’s official online documentation link.

softgrid.get_repository()

Get softgrid’s official online repository link.

softgrid.get_pypi()

Get softgrid’s pypi link.

softgrid.Globals

softgrid Globals contains all softgrid main global variables including default and machine specific rules.

The easiest way to alter the grid rules is by updating Globals.RULES prior to intializing the Orchestrator.

class softgrid.Globals.Logger(*args, **kwargs)

Bases: pysimplelog.SimpleLog.SingleLogger

This will work across the whole application on all machines

custom_init(*args, **kwargs)

Custom initialize abstract method. This method will be called at the end of initialzation. This method needs to be overloaded to custom initialize Logger instances.

Parameters
  1. *args (): This is used to send non-keyworded variable length argument list to custom initialize.

  2. **kwargs (): This is keyworded variable length of arguments. kwargs can be anything other than __init__ arguments.

softgrid_log(logType, message, gridUniqueName='role:name@address', utcTime=None, data=None, tback=None)

softgrid.Core

softgrid Core module contains all executors base implementations and definitions.

class softgrid.Core.GridBase(killAtDisconnect=None)

Bases: object

GridBase declares and handles all general properties of the excutor. Some of the defined properties are only defined in GridBase but must be handled in GridBase children. All os signal hadlers are defined in GridBase. GridBase instanciation is forbidden.

Parameters
  1. killAtDisconnect (None, Boolean): whether to kill process upon calling disconnect with kill parameters passed as None. and passed kill argument to the called disconnect method is None, then kill will be set to session killAtDisconnect boolean value and if the latter is None then it will be set to True if session is not interactive

property isDisconnected

Whether the excutor was disconnected

property killAtDisconnect

return killAtDisconnect flag value

property applicationName

Application name

property gridPath

Grid path to get to this object for Orchestrator

property gridUniqueName

grid executor grid unique name

property role

Excuctor role which can be Orchestrator, Router, Manger, etc.

property pid

Process id

property address

Machine address

property port

Executor communication port number

property portFetch

Whether to smartly fetch for available port

property password

Server communication password

property name

Executor name given by user

property softgridVersion

get softgrid used version

property uniqueName

Unique name given to all grid base instances upon instanciation

property UTCStartTime

Executor instantiation time

property startDateTime

Executor instantiation date time

property passwords

Dictionary copy of downgrid executors server password. e.g.

{'managers': {},
 'routers': {'router|Router@135.210.224.198[1234]:5002': 'd3656182-a486-11e8-846c-f45c89c22ebf',
             'router|Router@emote.machine.com[1235]:5000': 'f206112c-a486-11e8-9cf5-fa163ecc0309'},
 'workers': {}}
property paths

Copy of executors grid unique path. e.g.

{'manager': {},
 'router' : {'router|Router@135.210.224.198[1234]:5002': ['orchestrator|Orchestrator@135.210.224.198[1236]:5001'],
             'router|Router@emote.machine.com[1235]:5000': ['orchestrator|Orchestrator@135.210.224.198[1236]:5001']},
 'worker' : {}}
property routers

Downgrid routers grid unique name sorted by ‘downgrid_acknowledge_utctime’

property managers

Downgrid managers grid unique name sorted by ‘downgrid_acknowledge_utctime’

property workers

Downgrid workers grid unique name sorted by ‘downgrid_acknowledge_utctime’

kill()

Abruptly Kill executor by sending os kill signal. Calling kill result in forcefully stopping current process python execution and all executor’s downgrid will disconnect from grid. Unline disconnect, no cleanup will be made upon calling kill. All temporary files must be later manually removed. Better to always call disconnect before kill

disconnect(kill=None, killLocalDowngrids=True)

Disconnect executor from grid. All downgrids will be disconnected as well and those running on the same machine will be terminated with an os kill signal automatically if killLocalDowngrids is set to True. If running in non-interactive mode, an executor will kill itself upon disconnecting from grid. Interactive mode is True if executor is launched from a user terminal.

Parameters
  1. kill (None, Boolean): whether to kill process. If None is given then kill will be set to session killAtDisconnect boolean value and if the latter is None then it will be set to True if session is not interactive

  2. killLocalDowngrids (boolean): whether to os signal kill locally running downgrids

log_and_transfer(logType, message, tback=None, *args, **kwargs)

Template method to log and transfer logging to upgrid chain. Calling this template will result in printing log message and traceback to standard output only. Full log and transfer implementation is defined in TransferBase.

Parameters
  1. logType (string): the log type e.g. info

  2. message (string): the log message

  3. tback (None, str): the log traceback

transfer_to_executor_and_log(receiver, logType, message, tback=None, *args, **kwargs)

This is a template method that is meant to transfer a certain log to a different executor and start the logging chain at that executor. This template implementation will simply call ‘log_and_transfer’ and it is overloaded in TransferBase

Parameters
  1. receiver (string): executor grid unique name to transfer log to

  2. logType (string): the log type e.g. info

  3. message (string): the log message

  4. tback (None, str): the log traceback

split_grid_unique_name(gridUniqueName)

Split grid unique name into its different components. Raises an error if provided grid unique name is not formatted right.

Parameters
  1. gridUniqueName (str): grid unique name

Returns
  1. role (str): grid executor role

  2. name (str): grid executor name

  3. address (str): grid executor machine address

  4. pid (int): grid executor machine process PID

  5. port (int): grid executor server machine assigned port

add_object(name, obj, replace=True)

Add object to grid executor

Parameters
  1. name (string): object name

  2. obj (dict, string): object in string dumps format or a dictionary of ‘object’ key and string dumps format value and ‘loads’ key with loads module name ‘cPickle’,’pickle’ or ‘dill’

  3. replace (boolean): Whether to replace data when existing

Returns
  1. status (Collections.SoftgridStatus): Operation success or failure status.

get_object(name, default=None, serialize=False, mustExist=True)

Get object from grid executor

Parameters
  1. name (string): object name

  2. default (object): default object to return in case data does not exist

  3. serialize (boolean): whether to serialize object prior to returning

  4. mustExist (boolean): whether object name must exist

Returns
  1. object (object, Collections.SoftgridStatus): object defined under given name. If failed, SoftgridStatus will be returned.

pop_object(name, default=None, serialize=False, mustExist=False)

Pop and return object from grid executor

Parameters
  1. name (string): object name

  2. default (object): default value to return in case object does not exist

  3. serialize (boolean): whether to serialize prior to returning

  4. mustExist (boolean): whether object name must exist

Returns
  1. object (object, Collections.SoftgridStatus): poped object defined under given name. If failed, SoftgridStatus will be returned.

add_data(name, data, replace=True)

Add data to grid executor

Parameters
  1. name (string): data name

  2. data (object): data object

  3. replace (boolean): Whether to replace data when existing

Returns
  1. status (Collections.SoftgridStatus): Operation success or failure status.

get_data(name, default=None, mustExist=True)

Get data from grid executor

Parameters
  1. name (string): data name

  2. default (object): default value to return in case data does not exist

  3. mustExist (boolean): whether data name must exist

Returns
  1. data (object, Collections.SoftgridStatus): Data object defined under given name. If failed, SoftgridStatus will be returned.

pop_data(name, default=None, mustExist=False)

Pop and return data from grid executor

Parameters
  1. name (string): data name

  2. default (object): default value to return in case data does not exist

  3. mustExist (boolean): whether data name must exist

Returns
  1. data (object, Collections.SoftgridStatus): Poped data object defined under given name. If failed, SoftgridStatus will be returned.

add_file(name, path, data, root='temporary', writeFunc=None, wargs=None, wkwargs=None, readFunc=None, replace=True)

Add file to machine local drive given a reference name, path, data, write and read functions.

Parameters
  1. name (string): file name to refer to during execution.

  2. path (string): relative path to given root in local machine to write file

  3. data (None, object): file data to write to disk using writeFunc or open(path).write mehtod. If None is given, then a file in absolute path must exist. Setting data to None is used to add file to multiple executors in the same machine.

  4. root (None, str): root of path to where to write the file. If None then given path is absolute. Otherwise it must be either ‘’ which is equivalent to None or ‘temporary’ or ‘permanent’ so data path will be relative to respectively temporary or permanent directory.

  5. writeFunc (None, str, dict, callable): writing file function. if given, first argument must be the data and the second argument the file path.

  6. wargs (None, list): write function func list of arguments

  7. wkwargs (None, list): write function func dictionary of keyword arguments

  8. readFunc (None, str, dict, callable): reading file function. If given, first argument must be reserved for the file path.

  9. replace (boolean): Whether to replace file when existing and all file names pointing to the replaced file will be automatically removed.

Returns
  1. status (Collections.SoftgridStatus): Operation success or failure status.

get_file(name, rargs=None, rkwargs=None, default=None, mustExist=True)

Get softgrid stored file from local drive.

Parameters
  1. name (string): file name as refered to in executor.

  2. rargs (None, list): read function func list of arguments

  3. rkwargs (None, list): read function func dictionary of keyword arguments

  4. default (object): Default value to return in case file does not exist

  5. mustExist (boolean): Whether data name must exist

Returns
  1. data (object): File value as read using open(path).read or using defined readFunc. If failed to getting file SoftgridStatus will be returned.

pop_file(name, rargs=None, rkwargs=None, default=None, mustExist=True, delete=False)

Pop and return softgrid file data from grid executor.

Parameters
  1. name (string): file name as refered to in executor.

  2. rargs (None, list): read function func list of arguments

  3. rkwargs (None, list): read function func dictionary of keyword arguments

  4. default (object): default value to return in case file does not exist

  5. mustExist (boolean): whether data name must exist.

  6. delete (boolean): whether to delete file on disk as well

Returns
  1. fileParams (dictionary, Collections.SoftgridStatus): file parameters with ‘data’ key added. If failed, SoftgridStatus will be returned.

add_udf(name, func, firstArgExecutor, replace=True)

Add user defined function to grid executor

Parameters
  1. name (string): user defined function unique name to refer to. If given func is a string definition of the function then name must be the funtion name as defined in the func string

  2. func (callable, string): Callable or function source code to compile.

  3. firstArgExecutor (boolean): Whether func first argument must be the executor object. Executor object is remote therefore it can’t be passed among fargs or fkwargs. This flag allows the executor executing func to pass self as first argument to func followed by *fargs and **fkwargs

  4. replace (boolean): Whether to replace function when existing.

Returns
  1. status (Collections.SoftgridStatus): Operation success or failure status.

get_udf(name, default=None, mustExist=True)

Get function from grid executor.

Parameters
  1. name (string): function name.

  2. default (object): default value to return in case function does not exist

  3. mustExist (boolean): whether function name must exist.

Returns
  1. function (object): Function defined under given name. If failed, SoftgridStatus will be returned.

pop_udf(name, default=None, mustExist=False)

Pop and return function from grid executor.

Parameters
  1. name (string): Function name

  2. default (object): Default value to return in case function does not exist

  3. mustExist (boolean): Whether function name must exist

Returns
  1. function (object, Collections.SoftgridStatus): poped function defined under given name. If failed, SoftgridStatus will be returned.

add_package(packageDict, relativeRoot='', description='', sitePackages='temporary', replace=False)

Add user defined package or module to importable definitions

Parameters
  1. packageDict (dictionary): module or package dictionary as created using softgrid.Collection.read_files function.

  2. relativeRoot (string): name of directory (package) under which to store the module or package files. If empty, files and directories will be created directly under site-packages.

  3. description (None, string): some description that will be used as SoftgridStatus return name.

  4. sitePackages (string): designate where to store the package. It must be any ‘temporary’, ‘permanent’, ‘user’ or ‘memory’.

  5. replace (boolean): Whether to replace package or module if existing.

Returns
  1. status (Collections.SoftgridStatus): Operation success or failure status.

class softgrid.Core.MachineManagement(rules=None, *args, **kwargs)

Bases: softgrid.Core.GridBase

MachineManagement defines softgrid machines management at every executor. It enables every deployed executor to be aware of its environment and platform. Rules can be set for every machine and deployed softgrid executor. If machine rules are not given, default rules will be applied automatically from softgrid.Globals.RULES dictionary.

In order to customize a grid, it’s preferable to set machine rules before launching any executor.

Parameters
  1. rules (None, dict): rules and parameters given to local and remote machines.

    {'machine_maximum_number_of_managers': 1,
     'machine_maximum_number_of_routers': None,
     'machine_maximum_number_of_workers': None}
    
property machines

Copy of executor downgrid machines and softgrid executors dictionary. e.g.

{'135.210.224.198': {'manager': [],
                     'machine_number_of_cpu': 8,
                     'router': ['router|Router@135.210.224.198[1234]:5001',
                                'router|Router@135.210.224.198[1235]:5002'],
                     'worker': []},
'remote.machine.com': {'manager': [],
                       'machine_number_of_cpu': 12,
                       'router': ['router|Router@zremote.machine.com[12345]:5000'],
                       'worker': []}}
property machineNumberOfCores

Number of cores on current machine

property machinesAddress

Executor downgrid machines addresses list

property rules

Copy of rules dictionary set by the executor e.g.

{'overload_machine_rules': {},
 'machine_maximum_number_of_managers': 1,
 'machine_maximum_number_of_routers': None,
 'machine_maximum_number_of_workers': -1,
 'execution_allow_negative_priority': True,
 'execution_allow_none_priority': True,
 'log_manager_kwargs': {'logToFile': False,
                        'logToStdout': True,
                        'stdoutMaxLevel': None},
 'log_orchestrator_kwargs': {'logFileBasename': 'softgrid_application_2019-09-26-07-41-36',
                             'logToFile': False,
                             'logToStdout': True,
                             'stdoutMaxLevel': None},
 'log_router_kwargs': {'logToFile': False,
                       'logToStdout': True,
                       'stdoutMaxLevel': None},
 'log_worker_kwargs': {'logToFile': False,
                       'logToStdout': True,
                       'stdoutMaxLevel': None},
 'orchestrator_allow_managers': True,
 'orchestrator_allow_workers': True,
 'timeout_long': 20,
 'timeout_long_multiplier': 3,
 'timeout_short': 5,
 'timeout_short_multiplier': 3,
 'transfer_collect_sleep_time': 0.001,
 'transport_implementation': 'multiprocessing'}
get_machine_rules(address)

Get copy of machine rules given its address. Default rules are returned if machine rules are not specified

Parameters
  1. address (str): Machine address.

Returns
  1. rules (dict): Copy of machine rules dictionary.

set_machine_rules(address, rules)

Set machine’s rules. If machine is already in grid chain then new rules are tested to see if they are valid with current machine state. If not then rules are rejected.

Parameters
  1. address (str): Machine address.

  2. rules (dict): Machine new rules. Some of rules keys are:

    • machine_maximum_number_of_routers (None, integer): Maximum number of routers allowed on machine. If None, then unlimited number of routers can be launched on machine.

    • machine_maximum_number_of_managers (None, integer): Maximum number of managers allowed on machine. If None, then unlimited number of managers can be launched on machine.

    • machine_maximum_number_of_workers (None, integer): Maximum number of workers allowed on machine. If None, then as many workers as number of physical cpu cores on machine. If negative integer is given then this will be substracted from machine number of physical cpu cores.

get_machine_info(address)

Get machine grid information.

Parameters
  1. address (str): Machine address.

Returns

  1. info (dict): Machine information dictionary.

get_remaining_executors_allowed(address)

Get remaining allowed number of routers, managers and workers on machine as given in this executor rules

Parameters
  1. address (str): Machine address.

Returns

  1. allowed (dict): Machine information dictionary.

e.g.

{'number_of_routers': 0,
 'number_of_managers': 1,
 'number_of_workers': 7}

class softgrid.Core.TransferBase(*args, **kwargs)

Bases: softgrid.Core.MachineManagement

TransferBase inherits MachineManagement and defines all transfer methods, properties and definitions. This class defines all transfer method that allow executors to communicate between each other and allow user to interact with local and remote grid executors. Instanciation of this class is forbidden

get_managers_to_workers_receivers_bounce_lists()

Get downgrids managers receivers and respective workers as bounce list of lists

Returns
  1. receivers (list): List of downgrid managers grid unique names

  2. bounce (list): List of lists of downgrid workers grid unique names

get_transfer_template(func=None, args=None, kwargs=None, priority=1, bounce=None, promisePassword=None, returnResult=True, storeResult=False, resultName=None)

Get transfer prefilled dictionary data template.

Parameters
  1. func (None, str, callable): Function to execute remotely. Default value is None but this will raise an error if not set correctly.

  2. args (None, tuple, list): Function func arguments.

  3. kwargs (None, tuple, list): Function func keyword arguments.

  4. bounce (None, str, list): Bounce is used to bounce transfer to multiple receiver after reaching initial receiver. This is ultimately used to transfer to workers that are managed by the same manager. If not None, bounce must be a list of the same length as the normalized number of receivers. bounce list can contain None, grid unique names of sublists of grid unique names.

  5. priority (None, number): Execution queue priority.

  6. promisePassword (None, str): Execution job promise password.

  7. returnResult (None, str): Whether to execute function ‘func’ on remote executor and return back the result.

  8. storeResult (None, str): Whether to execute function ‘func’ on remote executor and store it on local executor.

  9. resultName (None, str): result handling name. If None, when returnResult is True, returned result will not be stored but logged instead.

Returns
  1. template (dict): Transfer dictionary prefilled template of all of the following key value pairs:

    • mode: transfer mode which should be ‘execute’

    • propagate_upgrid: whether to propage to executor upgrid

    • propagate_routers: whether to propage to executor downgrid routers

    • propagate_managers: whether to propage to executor downgrid managers

    • propagate_workers: whether to propage to executor downgrid workers

    • propagate_recursive: whether to propagate all the above recursively

    • bounce_transfer: used to bounce transfer to multiple receivers

    • function: the callable function or string referencing remote executor callable attribute

    • first_argument_executor: always None

    • promise_password: the execution job promise password.

    • args: function arguments to pass to upon execution

    • kwargs: function keyword arguments to pass to upon execution

    • result_handling: dictionary of rules to handling returned value from executed function

    • priority: job priority in execution queue

# standard library imports
import sys

# import softgrid orchestrator
from softgrid import Orchestrator

# instanciate orchestrator
O = Orchestrator(password='some_password_12345', acceptWorkers=True)

# create workers
status, ss,fl = O.launch_worker(upgridUniqueName=O.gridUniqueName,
                                executorAddress=O.address,
                                executorKwargs=None,
                                machineUser='some_user')
for s in status:
     print(s)

# define some function to transfer
def power(n,p):
    return n**p

# get transfer template
transferKwargs = O.get_transfer_template(func=power,
                                         args=(),
                                         kwargs={'n':10, 'p':6},
                                         priority=None,
                                         promisePassword=None,
                                         returnResult=True,
                                         storeResult=False,
                                         resultName=None)

# transfer and execute on a remote worker
result = O.transfer(transferKwargs=transferKwargs,
                       receivers=O.workers[-1],
                       collect=True)

# print result
print(result)

>>> [1000000]
get_transfer_wait_timeout(transferTimeout=10, multiplier=3, pathLength=None)

Get transfer wait for response timeout calculate using a single transfer timeout ‘transferTimeout’ and number of transfers multiplier.

Parameters
  1. transferTimeout (number): single transfer wait for result timeout

  2. multiplier (number): timeout multiplier. Wait timeout is calculated as timeout * ceil( pathLength/multiplier )

  3. pathLength (None, integer): number of executor the data will transfer to. If None is given, it’s assumed that the transfer is going upgrid till orchestrator and therefore pathLength will be set to len(self.gridPath)

Returns
  1. timeout (number): timeout waiting time. calculated as transferTimeout * ceil( transferPath/multiplier )

get_small_transfer_wait_timeout(pathLength=None)

Alias for get_transfer_wait_timeout using transferTimeout=rules[‘timeout_short’] and multiplier=rules[‘timeout_short_multiplier’]

get_big_transfer_wait_timeout(pathLength=None)

Alias for get_transfer_wait_timeout using transferTimeout=rules[‘timeout_long’] and multiplier=rules[‘timeout_long_multiplier’]

transfer(transferKwargs, receivers, allowBridging=True, bounce=None, collect=False, transferTimeout=None, multiplier=9223372036854775807)

Transfer executions across the grid using transfer keyword arguments and a set of receivers. Bouncing from a receiver to multiple is possible and adviseable to reduce network usage especially when bouncing is set between a manager and its downgrid workers. Bouncing transfers will have their transfer_unique_id changed at the bouncer receiver to transfer_unique_id +’==>’+ bounce_receiver_grid_unique_name

Parameters
  1. transferKwargs (dict, list): dictionary of transfer arguments. If list is given, it must be a list of dictionaries of the same length as the normalized number of receivers.

  2. receivers (string, list): string keyword or a valid executor grid unique name or a list of valid transfer trajectory of valid connected executors grid unique name or a list of a any combination of the listed before. Receivers list will be normalized and expanded whether keywords are used or not. Accepted string keywords are: * upgrid: current executor upgrid * routers: all current executor downgrid routers * managers: all current executor downgrid managers * workers: all current executor downgrid workers

  3. allowBridging (None, boolean, list): Whether to allow using connection bridges. If None is given, transferKwargs[‘allow_bridging’] will be used. If list is given it must be a list of None or boolean of the same length as the normalized number of receivers.

  4. bounce (None, str, list): Bounce is used to bounce transfer to multiple receiver after reaching initial receiver. This is ultimately used to transfer to workers that are managed by the same manager. If not None, bounce must be a list of the same length as the normalized number of receivers. bounce list can contain None, grid unique names of sublists of grid unique names.

  5. collect (boolean): Whether to collect and return transfered executions results. This will block interactivity until transfers to all receivers are returned. If transferTimeout is not None, collect will be constrained with the same timeout settings. If a transfer fails, SoftgridStatus will be returned with successful flag set to False, error attribute populated appropriately and data attribute containing transfer kwargs data that can be used used to resend the transfer if needed.

  6. transferTimeout (None, number): transfer result return with timeout. If None is given, then no timeout condition is set for returned result.

  7. multiplier (number, list): timeout multiplier. Wait timeout is calculated as timeout * ceil( len(gridPath)/multiplier )

Returns
  1. values (Collections.SoftgridStatus, None, list): If any error occurs, Collections.SoftgridStatus with failure flag will be returned. Otherwise, if collect is False, None will be returned. If collect is True, a list of all returned values from all receivers is returned.

# standard library imports
from __future__ import print_function
import sys, multiprocessing

# softgrid imports
from softgrid import Orchestrator, Globals

# Default settings forbid user to execute his own defined functions
# (UDF) on grid's routers and managers. This is meant to prevent
# cluttering the network and the machines with work that is only
# meant to be ran and managed by grid workers. In this particular
# case, for the sake of this example, we need to allow executing UDF
# and this can be done by altering some grid default rules. Those
# rules must be modified prior to instanciating the orchestrator

# Allow executing user defined functions on routers and managers
Globals.RULES["router_allow_executing_udf"]  = True
Globals.RULES["manager_allow_executing_udf"] = True

# Allow launching as many managers as one wants on a single machine.
# By default it's allowed to launch one manager by machine
Globals.RULES['machine_maximum_number_of_managers'] = None

# create orchestrator
O = Orchestrator(password='nopassword', acceptWorkers=False)

## launch four local routers
status, routers, fl = O.launch_router(upgridUniqueName=O.gridUniqueName,
                                      executorAddress=[O.address]*4,
                                      executorKwargs=None,
                                      machineUser='some_user')
for s in status:
    print(s)

# get routers gridUniqueName
routers = [e['gridUniqueName'] for e in routers]

## launch four local managers
status, managers, fl = O.launch_manager(upgridUniqueName=routers[-1],
                                        executorAddress=[O.address]*4,
                                        executorKwargs=None,
                                        machineUser='some_user')
for s in status:
    print(s)

# get managers gridUniqueName
managers = [e['gridUniqueName'] for e in managers]

## launch local workers
ncpu = O.machineNumberOfCores - 1
status, workers, fl = O.launch_worker(upgridUniqueName=managers[-1],
                                      executorAddress=[O.address]*ncpu,
                                      executorKwargs=None,
                                      machineUser=['some_user']*ncpu)
for s in status:
    print(s)

# get workers gridUniqueName
workers = [e['gridUniqueName'] for e in workers]

# create a user defined function that takes the
# remote executor as it's first argument
def get_gun(executor):
    from datetime import datetime
    t = datetime.now().strftime("%H:%M:%S.%f")
    return "%s --> %s"%(t, executor.gridUniqueName)


# create transfer data using helper function get_transfer_template
transferKwargs = O.get_transfer_template(func=get_gun, priority=None)

# set 'first_argument_executor' flag to true.
# This will tell remote executor to pass itself as the funtion first argument
transferKwargs['first_argument_executor'] = True

# create list of receivers which is a list of all routers, managers
# and workers grid unique names
receivers = O.routers+O.managers+O.workers

# transfer to receivers
guns = O.transfer(transferKwargs=transferKwargs, receivers=receivers, collect=True)

# print returned results which is a list of strings of the
# format timestamp --> gridUniqueName
print(guns)
log_and_transfer(logType, message, data=None, tback=None, gridUniqueName=None, utcTime=None, transferRoute=None, logLocally=True)

Log and transfer log to upgrid executor recursively.

Parameters
  1. logType (string): The log type

  2. message (string): The logging message

  3. data (object): any type of data to log

  4. tback (None, traceback.extract_stack): The traceback extracted stack

  5. gridUniqueName (string): Grid unique name of log initiator

  6. utcTime (number): Log initial time using time.time() function

  7. logLocally (boolean): Whether to log locally prior to transfer

  8. transferRoute (list): should never be used other than internally

# standard library imports
from __future__ import print_function
import sys, multiprocessing

# softgrid imports
from softgrid import Orchestrator

# create orchestrator
O = Orchestrator(password='nopassword', acceptWorkers=True)

## launch local workers
ncpu = O.machineNumberOfCores - 1
status, workers, fl = O.launch_worker(upgridUniqueName=O.gridUniqueName,
                                      executorAddress=[O.address]*ncpu,
                                      executorKwargs=None,
                                      machineUser=['some_user']*ncpu)
for s in status:
    print(s)

# get workers gridUniqueName
workers = [e['gridUniqueName'] for e in workers]

# create a user defined function that takes the
# remote executor as it's first argument
def log_and_get_gun(executor):
    from datetime import datetime
    t = datetime.now().strftime("%H:%M:%S.%f")
    executor.log_and_transfer(logType='info', message="I'm logging '%s' in my UDF"%t)
    return "%s --> %s"%(t, executor.gridUniqueName)

# create transfer data
transferKwargs = O.get_transfer_template(func=log_and_get_gun, priority=None)

# set 'first_argument_executor' flag to true.
# This will tell remote executor to pass itself as the funtion first argument
transferKwargs['first_argument_executor'] = True

# create list of receivers which is a list of all workers
# gridUniqueName
receivers = O.workers

# transfer to all receivers
guns = O.transfer(transferKwargs=transferKwargs, receivers=receivers, collect=True)

# print returned results which is a list of strings of the
# format timestamp --> gridUniqueName
print(guns)
transfer_to_executor_and_log(receiver, logType, message, tback=None, largs=None, lkwargs=None)

Transfer log data to a target executor then log_and_transfer from there.

Parameters
  1. receiver (string, list): string keyword or a valid executor grid unique name or a list of valid transfer trajectory of valid connected executors grid unique name or a list of a any combination of the listed before. Accepted string keywords are: * upgrid: current executor upgrid * routers: all current executor downgrid routers * managers: all current executor downgrid managers * workers: all current executor downgrid workers

  2. largs (None, list, tuple): log_and_transfer remaining arguments

  3. lkwargs (None, dict): log_and_transfer remaining keyword arguments.

# standard library imports
from __future__ import print_function
import sys, multiprocessing

# softgrid imports
from softgrid import Orchestrator, Globals

# allow multiple managers on the same machine. This is not a good
# practice but it's ok for the sake of this example
Globals.RULES['machine_maximum_number_of_managers'] = None

# create orchestrator
O = Orchestrator(password='nopassword', acceptWorkers=True)

## launch six local routers
status, routers,fl = O.launch_router(upgridUniqueName=O.gridUniqueName,
                                     executorAddress=[O.address]*6,
                                     executorKwargs=None,
                                     machineUser='some_user')
for s in status:
    print(s)

# get routers gridUniqueName
routers = [e['gridUniqueName'] for e in routers]

## launch six local managers
status, managers, fl = O.launch_manager(upgridUniqueName=routers[-1],
                                        executorAddress=[O.address]*6,
                                        executorKwargs=None,
                                        machineUser='some_user')
for s in status:
    print(s)

# get workers gridUniqueName
managers = [e['gridUniqueName'] for e in managers]

## launch local workers connecting directly to orchestrator
ncpu = O.machineNumberOfCores - 1
status, workers, fl = O.launch_worker(upgridUniqueName=O.gridUniqueName,
                                      executorAddress=[O.address]*ncpu,
                                      executorKwargs=None,
                                      machineUser=['some_user']*ncpu)
for s in status:
    print(s)

# get workers gridUniqueName
workers = [e['gridUniqueName'] for e in workers]

# create a user defined function that takes the
# remote executor as it's first argument
def transfer_log_and_get_gun(executor, receivers):
    import random
    from datetime import datetime
    # get time
    t = datetime.now().strftime("%H:%M:%S.%f")
    # get all workers different than this one
    receivers = [w for w in receivers if w!=executor.gridUniqueName]
    # select a worker to transfer and log from
    receiver = random.choice(receivers)
    # transfer and log
    m = "I am '%s', transfering my log '%s' to '%s' and initiate the log from there"%(executor.gridUniqueName,t,receiver)
    executor.transfer_to_executor_and_log(receiver=receiver, logType='info', message=m)
    # return data
    return "%s --> %s"%(t, executor.gridUniqueName)

# create transfer data
transferKwargs = O.get_transfer_template(func=transfer_log_and_get_gun, priority=None, kwargs={'receivers':O.routers+O.managers})

# set 'first_argument_executor' flag to true.
# This will tell remote executor to pass itself as the funtion first argument
transferKwargs['first_argument_executor'] = True

# create list of receivers which is a list of all workers
# gridUniqueName
receivers = O.workers

# transfer to all receivers
guns = O.transfer(transferKwargs=transferKwargs, receivers=receivers, collect=True)

# print returned results which is a list of strings of the
# format timestamp --> gridUniqueName
print(guns)
class softgrid.Core.RequesterResponderBase(*args, **kwargs)

Bases: softgrid.Core.TransferBase

Base implementation to perform synchronous and asynchronous computations over the grid. This is implemented using transfer request methods and wait for request and wait for response methods.

transfer_request_to_responder(responder, password, request, timeout=3600, allowedResponders=None, register=True, errorRaise=False, errorLog=True)

Transfer a request for a response to responder executor. This can be used as an execution hold for synchronous or asynchronous grid computation until a response is received from a responder grid executor

Parameters
  1. responder (string, list): responder grid unique name or a list of responders.

  2. password (string, list): request password. If a response to request is received with wrong password it will be refused. If responder is a list of multple items, password must be a list of unique strings

  3. request (object, list): any transferable data that is used as the request. This can be a message or any other data that a responder might need to answer the request. If responder is a multiple items list, request must be a list of the same size

  4. timeout (None, int, float, list): number of seconds to wait for a response. If None is given then Globals.maxint is set. If list is given it must be of the same size as responder

  5. allowedResponders (None, string, list): allowed executors that are allowed to respond to the given request. If None, any executor response will be accepted as long as the password is correct.

  6. register (boolean): whether to register request for response tracking. If False, request won’t ge registered nor tracked for response. This is useful as a last request to responders to stop waiting for requests.

  7. errorRaise (boolean, list): whether to raise an error if when transfering a request a error occured

  8. errorLog (boolean, list): whether to log if request does generate an error

# In this code snippet we will be performing synchronous
# computation between the orchestrator and a random worker.
# The orchestrator will launch remote_increment function on a
# worker. This function will instanciate a variable called
# totalSum increment its value by incrementValue constant and
# sends it back to the orchestrator as a request for response.
# The worker execution will block until orchestrator responds back
# with the response for the request. At this time the orchestrator
# will be waiting to receive a request from the worker
# using wait_for_request method. Upon receiving the request,
# the orchestrator will get totalSum value, increments it by
# incrementValue and responds it back to the worker which was doing
# nothing but waiting for the response back from the orchestrator.
# This cycle will be repeated 100 time until finally the
# total value is logged using log_and_transfer method.


# standard library imports
import time, uuid, sys

# softgrid imports
from softgrid import Orchestrator

# create some funky password
t = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime(time.time()))
s = str(uuid.uuid1())
PASSWORD = "%s_%s"%(t,s)

# create orchestrator
O = Orchestrator(password='some_password_12345', acceptWorkers=True)

# create local workers
ncpu = O.machineNumberOfCores - 1
status, workers, fl = O.launch_worker(upgridUniqueName=O.gridUniqueName,
                                      executorAddress=[O.address]*ncpu,
                                      executorKwargs=None,
                                      machineUser=['some_user']*ncpu)
for s in status:
    print(s)

# get workers gridUniqueName
workers = [e['gridUniqueName'] for e in workers]

# define remote function to execute at the requester side
def remote_increment(executor, responder, password, startValue, incrementValue, cycles):
    import time
    totalSum = startValue
    for step in range(cycles):
        # increment total sum
        totalSum += incrementValue
        # send transfer for a response
        executor.transfer_request_to_responder(responder=responder, password=password, request=totalSum)
        # wait for response
        result = executor.wait_for_response(password=password,timeout=20,frequency=0.0005)
        # get result from responder
        totalSum = result[password]['response']['value']

# create function to call at orchestrator side which is the
# responder's side in this case. this function will transfer
# 'remote_increment' to a worker and start request and respond loops
def transfer_and_start_respond_to_requests(O=O):
    # create transfer data
    transferKwargs = O.get_transfer_template(func=remote_increment)
    transferKwargs["kwargs"] = {'responder':O.gridUniqueName,
                                'password':PASSWORD,
                                'startValue':1,
                                'incrementValue':1,
                                'cycles':100}
    transferKwargs['first_argument_executor'] = True
    transferKwargs['priority']                = None
    # transfer remote_increment to worker and start executing remotely
    O.transfer(transferKwargs=transferKwargs,
               receivers=O.workers[-1],
               collect= False)
    # initiate wait for request on orchestrator
    startTime = time.time()
    stepsTime = []
    for step in range(100):
        tic = time.time()
        # wait for request of certain password
        request = O.wait_for_request(password=PASSWORD, frequency=0.0005)
        # get request value and add 1
        value   = request[PASSWORD]['request'] + 1
        # get requester which should be the same as O.workers[-1]
        requester = request[PASSWORD]['requester']
        # respond to requester
        O.transfer_response_to_requester(requester=requester, password=PASSWORD, response=value)
        stepsTime.append(time.time()-tic)
    endTime = time.time()
    time.sleep(1) # wait for all logs before logging final message
    message = "Total request and wait for response time for %s back and forth cycles is %.5f sec. (avg/cycle %.5f)"%(100,endTime-startTime, sum(stepsTime)/len(stepsTime))
    O.log_and_transfer('info', message)

# execute function
transfer_and_start_respond_to_requests()
transfer_asynchronous_request(requester, newResponder, password, response)

Transfer a request from one responder to another. This is must be called at the responder side when more information is needed from another executor responder prior to responding back to the requester. This mechanism allows asynchronous calculation on the grid where a worker requester halts execution waiting for a response that might travel between multiple workers before returning back to the requester.

Parameters
  1. requester (string, list): The original requester grid unique name or a list or original requesters.

  2. newResponder (string, list): New responder to the original request sent by requester. Or a list of new responder of the same size as requester list

  3. password (string, list): request password or a list of passwords if of the same size as requester list

  4. response (object): Any object to transfer to the newRequester. In general this can be the original request or an updated request

# In this code snippet we will be performing asynchronous
# computation between a randomly selected requester worker and
# the remaining of the workers as responders.
# The orchestrator will transfer and execute 'request_function'
# at the requester side and will transfer and execute
# 'respond_function' at all responders side.
# 'request_function' function  will loop through all given
# NUMBER_OF_CYCLES request for response cycles. The request for
# response is set to a randomly selected a responder from the list
# of responders. The request received at the responder
# side might get transfered to multiple other responders before
# returning back to the requester. The execution will be waiting for
# all that time because of the called method 'wait_for_response'.
# Once the response reaches the requester, the time to get back
# the response is calculated, appended to the response and the
# cycle loops back to send another request for response.
# At the responder side, where 'respond_function' function is
# executed, responders are waiting for any request. Once one is
# received, a random number is drawn to decide whether to respond
# or to transfer to an asynchronous responder at a probability
# of ASYNCHRONOUS_PROBA. When a request is transfered more than
# MAX_ASYNCHRONOUS_TRANSFERS the responder will respond back to
# request without additional asynchronous transfers.


# standard library imports
import time, uuid, sys, random

# softgrid imports
from softgrid import Orchestrator, Globals

######################### USER PARAMETERS ##########################
MAX_ASYNCHRONOUS_TRANSFERS = 10   # maximum asynchronous request transfer per request
ASYNCHRONOUS_PROBA         = 0.85 # probability or asynchronous request transfer
NUMBER_OF_CYCLES           = 200  # this is the total number of requestss


####################### DON'T TOUCH FROM HERE ######################
## allow machine to accept any number of workers
Globals.RULES['machine_maximum_number_of_workers']  = None

# create some funky request password
t = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime(time.time()))
s = str(uuid.uuid1())
PASSWORD = "%s_%s"%(t,s)

######################## LAUNCHING THE GRID ########################
# create orchestrator
O = Orchestrator(password='some_password_12345', acceptWorkers=True)

# executor kwargs used here to mute some worker grid log types
executorKwargs = {'logKwargs':{'logTypes':{'execution_started': {'stdoutFlag':False, 'fileFlag':False},
                                           'execution_finished':{'stdoutFlag':False, 'fileFlag':False},
                                           'result':            {'stdoutFlag':False, 'fileFlag':False},
                                           }
                              }
                  }

# create local workers
status, workers,_ = O.launch_worker(upgridUniqueName=O.gridUniqueName,
                                    executorAddress=[O.address]*10,
                                    executorKwargs=executorKwargs,
                                    machineUser=['some_user']*10)
for s in status:
    print(s)

# get workers gridUniqueName
workers = [e['gridUniqueName'] for e in workers]

# select random requester from the list of workers
requester  = random.choice(workers)
# the remaining of the workers are designated responders
responders = [w for w in workers if w != requester]

################### DEFINING REQUESTER FUNCTION ####################
def request_function(executor, responders, password, numberfOfCycles):
    import time, random
    from datetime import datetime
    result = []
    for step in range(numberfOfCycles):
        # initiate request data
        tic       = time.time()
        responder = random.choice(responders)
        request   = {step:[]}
        # send transfer request
        executor.transfer_request_to_responder(responder=responder, password=password, request=request)
        # wait for response
        response = executor.wait_for_response(password=password,timeout=20,frequency=0.0005)
        # get response dictionary value
        response = response[password]['response']['value']
        # add total request to response time
        response['total_time'] = time.time()-tic
        result.append( response )
        # log and transfer cycle step finished
        tt  = response['total_time']
        ls  = len(response[step])
        avg = float(tt)/float(ls)
        msg = "Finished cycle '%i' in '%.4f' sec. for '%i' asynchronous transfers averaging '%.4f' sec. per transfer"%(step,tt,ls,avg)
        executor.log_and_transfer(logType='info', message=msg)
    # send final requests to stop all responders from waiting for requests
    executor.transfer_request_to_responder(responder=responders, password=[password]*len(responders), request='STOP', register=False)
    # return result
    return result

################### DEFINING RESPONDERS FUNCTION ###################
def respond_function(executor, requester, responders, password, maxAsynchronousTransfers, asynchronousProba):
    from datetime import datetime
    import random
    responders = [w for w in responders if w != executor.gridUniqueName]
    while True:
        request = executor.wait_for_request(password=password, timeout=120, frequency=0.0005)
        # get request value
        value = request[password]['request']
        # check for request to break
        if value == 'STOP':
            executor.log_and_transfer(logType='info', message="Received 'STOP' request. Exiting 'wait_for_request' loop")
            break
        k = list(value)[0]
        value[k].append((executor.gridUniqueName, datetime.now().strftime("%H:%M:%S.%f")))
        # if path length is bigger than 10, or random > asynchronousProba then respond back to requester
        if len(value[k])>=maxAsynchronousTransfers or random.random()>asynchronousProba:
            executor.transfer_response_to_requester(requester=requester, password=password, response=value)
        else:
            gun = random.choice(responders)
            executor.transfer_asynchronous_request(requester=requester, newResponder=gun, password=password, response=value)


################## TRANSER FUNCTION TO RESPONDERS ##################
transferKwargs = O.get_transfer_template(func=respond_function)
transferKwargs["kwargs"] = {'requester':requester,
                            'responders':responders,
                            'password':PASSWORD,
                            'maxAsynchronousTransfers':MAX_ASYNCHRONOUS_TRANSFERS,
                            'asynchronousProba':ASYNCHRONOUS_PROBA}
transferKwargs['first_argument_executor'] = True
transferKwargs['priority']                = None
O.transfer(transferKwargs=transferKwargs,
           receivers=responders,
           collect= False)

########## TRANSER FUNCTION REQUESTER AND WAIT FOR RESULT ##########
transferKwargs = O.get_transfer_template(func=request_function)
transferKwargs["kwargs"] = {'responders':responders,
                            'password':PASSWORD,
                            'numberfOfCycles':NUMBER_OF_CYCLES}
transferKwargs['first_argument_executor'] = True
transferKwargs['priority']                = None
# transfer remote_increment to worker and start executing remotely
result = O.transfer(transferKwargs=transferKwargs,
                    receivers=requester,
                    transferTimeout= 3600,
                    collect= True)


########################## PLOT HISTOGRAMS #########################
cyclesTime     = [[] for _ in range(MAX_ASYNCHRONOUS_TRANSFERS)]
firstResponder = dict([(w,0) for w in responders])
asyncResponder = dict([(w,0) for w in responders])
for idx, r in enumerate(result[0]):
    tt = r['total_time']
    rt = r[idx]
    cyclesTime[len(rt)-1].append(tt)
    firstResponder[rt[0][0]] += 1
    for r in rt[1:]:
        asyncResponder[r[0]] += 1

# compute averages
cyclesAvg = [0 if not len(i) else sum(i)/len(i) for i in cyclesTime]


# plot asynchronous transfer time histogram
import matplotlib.pyplot as plt
_ = plt.figure()
_ = plt.bar([i for i in range(1, len(cyclesAvg)+1)], cyclesAvg, .8)
_ = plt.xlabel('Number of asynchronous transfers per request')
_ = plt.ylabel('Time (sec.)')
_ = plt.xticks(range(1,len(cyclesTime)+1))
for i,d in enumerate(cyclesTime):
    _ = plt.text(x=i+1 , y=cyclesAvg[i] , s=f"{len(d)}" , fontdict=dict(fontsize=10,horizontalalignment='center'))

# plot number of transfers per responder
_ = plt.figure()
wlist = list(firstResponder)
_ = plt.bar(range(len(responders)), [firstResponder[k] for k in wlist], 0.8, label='First Responder')
_ = plt.bar(range(len(responders)), [asyncResponder[k] for k in wlist], 0.8, bottom=[firstResponder[k] for k in wlist], label='Asynchronous Responder')
_ = plt.legend()
_ = plt.ylabel('Number of transfers')
_ = plt.xticks(range(len(wlist)), range(1,len(wlist)+1))
_ = plt.xlabel('Responders')
plt.tight_layout()
plt.show()
_images/transfer_asynchronous_request_1.png

This distribution shows the average time to receive the response for asynchronous request transfers. Y axis is the time in seconds while the X axis the is the number of asynchronous transfers. The numbers on top of each bar are the number of recorded requests corresponding to the bar. e.g. 13 on top of the bar at x=5 means that there was 13 requests that were 5 times asynchronously transfered before reaching the requester

_images/transfer_asynchronous_request_2.png

This distribution shows the number of transfers per responder. X axis is the 9 responding workers and Y axis is the number of transfers reaching a responder. The blue bars distribution show the number of new requests reaching the first responder. The orange bar distribution is the number of asynchronous request transfer reaching the responders

transfer_response_to_requester(requester, password, response, blocking=False, errorRaise=False, errorLog=True)

Transfer response to a request

Parameters
  1. requester (string, list): requester grid unique name or a list of requesters.

  2. password (string, list): request password or a list of passwords matching the number of requesters

  3. response (object, list): the response to the request. If list, it must be of the same length as requester list

  4. errorRaise (boolean, list): whether to raise an error if request does not exist or any other error occured while looping responses to requesters

  5. errorLog (boolean, list): whether to log if request does not exist or any other error occured while looping responses to requesters

wait_for_response(password, timeout=60, frequency=0.001)

Wait loop for a response for a specific request given the request password

Parameters
  1. password (string, list): request password or list of requests password

  2. timeout (int, float, list): wait timeout or list of timeout having the same length as password list

  3. frequency (float): loop check for response frequency. This is the sleep time between checks

Returns
  1. response (dictionary): responses for requests where keys are passwords. A succesfully received response will have ‘response_received_utctime’ value not None

wait_for_request(password, timeout=60, frequency=0.001)

Wait loop for a response for a specific request given the request password

Parameters
  1. password (string, list): request password or list of requests password

  2. timeout (int, float, list): wait timeout or list of timeout having the same length as password list

  3. frequency (float): loop check for response frequency. This is the sleep time between checks

Returns
  1. request (dictionary): dictionary of requests where keys are passwords. A succesfully received request will have a non None value

release_request(password)

Release sent or received request for response from this executor. This should be used when an old request with a long timeout is no more needed or when a response is received but no need to collect it anymore or when a request for response password needs to be recycled. This method will return running ‘wait_for_request’ and ‘wait_for_response’ blocking methods

Parameters
  1. password (string, list): request for response password. This can be a unique string password or a list of passwords or a list of passwords of the same length as responder list

transfer_release_request(gun, password)

Release sent or received request for response from this or remote executors. This should be used when an old request with a long timeout is no more needed or when a request for response password needs to be recycled. This method will return running ‘wait_for_request’ and ‘wait_for_response’ blocking methods

Parameters
  1. gun (string, list): executor grid unique name or a list of executors grid unique name.

  2. password (string, list): request for response password. This can be a unique string password or a list of passwords or a list of passwords of the same length as responder list

class softgrid.Core.MethodsBase(*args, **kwargs)

Bases: softgrid.Core.RequesterResponderBase

get_executor_grid_path(gridUniqueName, transferTimeout=10, multiplier=3)

recursively fetch for executor grid path given its grid unique name

Parameters
  1. gridUniqueName (string): executor grid unique name

  2. transferTimeout (number): Transfer wait timeout

  3. multiplier (number): Transfer timeout grid path length multiplier

Returns
# result (dictionary): Dictionary where key is the upgrid grid

unique name which located the requested grid path. If empty then given gridUniqueName was not found and therefore not valid.

transfer_add_object(name, obj, receivers, bounce=None, timeout=10, dumps=None, replace=True)

Add object to specific receivers. Object must be serializable using cPickle, pickle or dill dumps method. After transfer, object will be revived using cPickle, pickle or dill loads method.

Parameters
  1. name (string): Object name

  2. obj (object): object that must be pickle, cPickle or dill dumps able

  3. receivers (string, list): string keyword or a valid executor grid unique name or a list of valid transfer trajectory of valid connected executors grid unique name or a list of a any combination of the listed before. Accepted string keywords are: * upgrid: current executor upgrid * routers: all current executor downgrid routers * managers: all current executor downgrid managers * workers: all current executor downgrid workers

  4. bounce (None, str, list): Bounce is used to bounce transfer to multiple receiver after reaching initial receiver. This is ultimately used to transfer to workers that are managed by the same manager. If not None, bounce must be a list of the same length as the normalized number of receivers. bounce list can contain None, grid unique names of sublists of grid unique names.

  5. timeout (None, int, float): Status return timeout. If None is given, status won’t be expected.

  6. dumps (None, string): Dumps method used to serialize. It can be ‘cPickle’, ‘pickle’ or ‘dill’. When None is given ‘cPickle’ then ‘pickle’ and finally ‘dill’ are tried for a successful candiated.

  7. replace (boolean): Whether to replace object when existing

transfer_get_object(name, receivers, timeout=10, default=None, mustExist=True)

Get object from specific remote receivers.

Parameters
  1. name (string): Object name

  2. receivers (string, list): string keyword or a valid executor grid unique name or a list of valid transfer trajectory of valid connected executors grid unique name or a list of a any combination of the listed before. Accepted string keywords are: * upgrid: current executor upgrid * routers: all current executor downgrid routers * managers: all current executor downgrid managers * workers: all current executor downgrid workers

  3. timeout (int, float): Transfer collecting object timeout.

  4. default (object): Default value to return in case data does not exist

  5. mustExist (boolean): Whether data name must exist

Returns
  1. data (object): Data object defined under given name

transfer_pop_object(name, receivers, timeout=10, default=None, mustExist=False)

Pop and return object from specific remote receivers.

Parameters
  1. name (string): Object name

  2. receivers (string, list): string keyword or a valid executor grid unique name or a list of valid transfer trajectory of valid connected executors grid unique name or a list of a any combination of the listed before. Accepted string keywords are: * upgrid: current executor upgrid * routers: all current executor downgrid routers * managers: all current executor downgrid managers * workers: all current executor downgrid workers

  3. timeout (None, int, float): Popped object returned timeout. If None is given, object will be popped remotely but not needed for return.

  4. default (object): Default value to return in case data does not exist

  5. mustExist (boolean): Whether data name must exist

Returns
  1. object (object): Poped object defined under given name

transfer_add_data(name, data, receivers, bounce=None, timeout=10, replace=True)

Add data to specific receivers. This is reserved to simple data structures of python, numpy and pandas. If failed, to transfer for serialization issues, transfer_add_object must be used.

Parameters
  1. name (string): Data name

  2. data (object): Data object

  3. receivers (string, list): string keyword or a valid executor grid unique name or a list of valid transfer trajectory of valid connected executors grid unique name or a list of a any combination of the listed before. Accepted string keywords are: * upgrid: current executor upgrid * routers: all current executor downgrid routers * managers: all current executor downgrid managers * workers: all current executor downgrid workers

  4. bounce (None, str, list): Bounce is used to bounce transfer to multiple receiver after reaching initial receiver. This is ultimately used to transfer to workers that are managed by the same manager. If not None, bounce must be a list of the same length as the normalized number of receivers. bounce list can contain None, grid unique names of sublists of grid unique names.

  5. timeout (None, int, float): Status return timeout. If None is given, status won’t be expected.

  6. replace (boolean): Whether to replace data when existing

transfer_get_data(name, receivers, timeout=10, default=None, mustExist=True)

Get data from specific receivers.

Parameters
  1. name (string): Data name

  2. receivers (string, list): string keyword or a valid executor grid unique name or a list of valid transfer trajectory of valid connected executors grid unique name or a list of a any combination of the listed before. Accepted string keywords are: * upgrid: current executor upgrid * routers: all current executor downgrid routers * managers: all current executor downgrid managers * workers: all current executor downgrid workers

  3. timeout (None, int, float): timeout value to get data from remote executor. If None is given, method will block until data returned.

  4. default (object): Default value to return in case data does not exist

  5. mustExist (boolean): Whether data name must exist

Returns
  1. data (object): Data object defined under given name

transfer_pop_data(name, receivers, default=None, timeout=10, mustExist=False)

Pop and return data from specific receivers.

Parameters
  1. name (string): Data name

  2. receivers (string, list): string keyword or a valid executor grid unique name or a list of valid transfer trajectory of valid connected executors grid unique name or a list of a any combination of the listed before. Accepted string keywords are: * upgrid: current executor upgrid * routers: all current executor downgrid routers * managers: all current executor downgrid managers * workers: all current executor downgrid workers

  3. default (object): Default value to return in case data does not exist

  4. timeout (None, int, float): Popped data returned timeout. If None is given, object will be popped remotely but not needed for return.

  5. mustExist (boolean): Whether data name must exist

Returns
  1. data (object): Poped data object defined under given name

transfer_add_udf(name, func, firstArgExecutor, receivers, timeout=10, replace=True)

Add function to specific receivers

Parameters
  1. name (string): Function name.

  2. func (object): Function object.

  3. firstArgExecutor (boolean): Whether func first argument must be the executor object. Executor object is remote therefore it can’t be passed among fargs or fkwargs. This flag allows the executor executing func to pass self as first argument to func followed by *fargs and **fkwargs

  4. receivers (string, list): string keyword or a valid executor grid unique name or a list of valid transfer trajectory of valid connected executors grid unique name or a list of a any combination of the listed before. Accepted string keywords are: * upgrid: current executor upgrid * routers: all current executor downgrid routers * managers: all current executor downgrid managers * workers: all current executor downgrid workers

  5. timeout (None, int, float): Status return timeout. If None is given, status won’t be expected.

  6. replace (boolean): Whether to replace function when existing

transfer_get_udf(name, receivers, default=None, timeout=10, mustExist=True)

Get data from specific receivers

Parameters
  1. name (string): function name

  2. receivers (string, list): string keyword or a valid executor grid unique name or a list of valid transfer trajectory of valid connected executors grid unique name or a list of a any combination of the listed before. Accepted string keywords are: * upgrid: current executor upgrid * routers: all current executor downgrid routers * managers: all current executor downgrid managers * workers: all current executor downgrid workers

  3. default (object): Default value to return in case function does not exist

  4. mustExist (boolean): Whether data name must exist

Returns
  1. function (object): Function defined under given name

transfer_pop_udf(name, receivers, default=None, timeout=10, mustExist=False)

Pop and return data from specific receivers

Parameters
  1. name (string): Function name

  2. receivers (string, list): string keyword or a valid executor grid unique name or a list of valid transfer trajectory of valid connected executors grid unique name or a list of a any combination of the listed before. Accepted string keywords are: * upgrid: current executor upgrid * routers: all current executor downgrid routers * managers: all current executor downgrid managers * workers: all current executor downgrid workers

  3. default (object): Default value to return in case function does not exist.

  4. timeout (None, int, float): Popped udf returned timeout. If None is given, object will be popped remotely but not needed for return.

  5. mustExist (boolean): Whether function name must exist

Returns
  1. data (object): Poped data object defined under given name

transfer_add_package(path, receivers, timeout=10, sitePackages='temporary', replace=False, rejectFiles='^.*?.(so|pyd|pyc)$', '^(?![a-zA-Z_]).*$', acceptFiles='^[a-zA-Z_]{1,}[a-zA-Z0-9_]{0,}.(py|pyx|h|c|cc|cxx|c\\\\+\\\\+|hh|H|hxx|hpp|h\\\\+\\\\+)$', rejectFolders='^\\\\..*?$', '^__pycache__$', '^docs$', '^Examples$', acceptFolders='^[a-zA-Z_]{1,}[a-zA-Z0-9_]{0,}$', dataFiles=None)

Transfer a local package or module to remote machine site-packages.

Parameters
  1. path (string): Local machine package or module path.

  2. receivers (string, list): string keyword or a valid executor grid unique name or a list of valid transfer trajectory of valid connected executors grid unique name or a list of a any combination of the listed before. Accepted string keywords are: * upgrid: current executor upgrid * routers: all current executor downgrid routers * managers: all current executor downgrid managers * workers: all current executor downgrid workers

  3. timeout (None, int, float): Status return timeout. If None is given, status won’t be expected.

  4. sitePackages (string): must be any of ‘temporary’, ‘permanent’ or ‘user’

  5. replace (boolean): Whether to replace package or module if existing.

  6. rejectFiles (list, tuple, set): List of regular expressions when satisfied file will be ommited. List can be empty if no rejection conditions are needed.

  7. acceptFiles (list, tuple, set): List of regular expressions where only when any is satisfied file will be read ad returned. List can be empty if all files can be accepted.

  8. rejectFolders (list, tuple, set): List of regular expressions when satisfied folder and its content will be ommited. List can be empty if no rejection conditions are needed.

  9. acceptFolders (list, tuple, set): List of regular expressions where only when any is satisfied folder and its content will be walked. List can be empty if all folders can be accepted.

N.B. This is safe to transport python packages with no compiled files. In case compile files are needed unless the remote machine has the same hardware and compiler as the local, all non python files must be compiled on remote machine using setup.py or other commands. In this case it’s safer to limit transered files using the following conditions and launching remote compilation commands.

# possible conditions are
rejectFiles   = (r"^.*?.(so|pyd|pyc)$", r"^(?![a-zA-Z_]).*$",)
acceptFiles   = (r"^[a-zA-Z_]{1,}[a-zA-Z0-9_]{0,}.(py|pyx|h|c|cc|cxx|c\+\+|hh|H|hxx|hpp|h\+\+)$",)
rejectFolders = (r"^\..*?$", r"^__pycache__$", r"^docs$", r"^Examples$")
acceptFolders = (r"^[a-zA-Z_]{1,}[a-zA-Z0-9_]{0,}$",)
class softgrid.Core.AssessBase(*args, **kwargs)

Bases: softgrid.Core.MethodsBase

This method contain all methods to test grid’s performance

assess_transfer_speed(toGUN, fromGUN=None, data=None, number=10, timeout=20, plot=False)

Assess the back and forth speed of data transfer between two executor

Parameter
  1. toGUN (string): Executor grid unique name to transfer data to

  2. fromGUN (None, string): Executor grid unique name to transfer data from. If None, this executor is used.

  3. data (object): Any transferable data object to test the speed of transfer

  4. number (integer): The number of assessment transfers to get a statistical answer

  5. timeout (number): timeout per transfer iteration

  6. plot (boolean): whether to plot results

Returns
  1. assessment (list, Collections.SoftgridStatus): Speed test assessment result list or a Collections.SoftgridStatus if an error happened

# standard library imports
import sys, multiprocessing

# import softgrid orchestrator
from softgrid import Orchestrator

# parameters
NUMBER     = 20                    # number of transfer cycles to get a statistical representation
NO_DATA    = None                  #            (16 bytes)
TINY_DATA  = list(range(1000000))  # 1000,000   (37 MB)
SMALL_DATA = list(range(10000000)) # 10,000,000 (370 MB)
BIG_DATA   = list(range(50000000)) # 50,000,000 (1.85 GB)

# instanciate orchestrator
O = Orchestrator(password='some_password_12345', acceptWorkers=True)

# create workers
ncpu = O.machineNumberOfCores - 1
status, ss,fl = O.launch_worker(upgridUniqueName=O.gridUniqueName,
                                 executorAddress=[O.address]*ncpu,
                                 executorKwargs=None,
                                 machineUser=['some_user']*ncpu)
for s in status:
     print(s)

## ASSESS FROM ORCHESTRATOR TO WORKER WHICH IS A DIRECT PATH
# assess no data
result = O.assess_transfer_speed(toGUN=O.workers[-1], fromGUN=None, data=NO_DATA, number=NUMBER, plot=True)
for r in result:
     print(r)

# assess tiny data
result = O.assess_transfer_speed(toGUN=O.workers[-1], fromGUN=None, data=TINY_DATA, number=NUMBER, plot=True)
for r in result:
     print(r)

# assess small data
result = O.assess_transfer_speed(toGUN=O.workers[-1], fromGUN=None, data=SMALL_DATA, number=NUMBER, plot=True)
for r in result:
     print(r)

# assess big data
result = O.assess_transfer_speed(toGUN=O.workers[-1], fromGUN=None, data=BIG_DATA, number=NUMBER, plot=True)
for r in result:
     print(r)


## ASSESS FROM WORKER TO ANOTHER WHICH IS A 2 STEPS PATH
# assess no data
result = O.assess_transfer_speed(toGUN=O.workers[-1], fromGUN=O.workers[-2], data=NO_DATA, number=NUMBER, plot=True)
for r in result:
     print(r)

# assess tiny data
result = O.assess_transfer_speed(toGUN=O.workers[-1], fromGUN=O.workers[-2], data=TINY_DATA, number=NUMBER, plot=True)
for r in result:
     print(r)

# assess small data
result = O.assess_transfer_speed(toGUN=O.workers[-1], fromGUN=O.workers[-2], data=SMALL_DATA, number=NUMBER, plot=True)
for r in result:
     print(r)

# assess big data
result = O.assess_transfer_speed(toGUN=O.workers[-1], fromGUN=O.workers[-2], data=BIG_DATA, number=NUMBER, plot=True)
for r in result:
     print(r)
_images/assess_transfer.png

Transfer speed between two different executors on the grid. Left column plots show speed of transfer between two executors that are one transfer step away because they are directly connected one to another. The right side column plots show the speed of transfer between two executors that are two transfer steps away because they are seperated by a third executor on the grid.

visualize_grid(*args, **kwargs)

Visualize the grid graph as seen from the calling executor. This helper method will call softgrid.Collections.GridGraph.draw_graph method and return any data this method returns. All given arguments (*args) and keyword arguments (**kwargs) will be passed and used in draw_graph method. The grid graph visual shape and executors positions in the canvas is not guaranteed nor reproducable but the visualization respects all connections and executors role and count. Calling this function multiple time in a row will give different graph shapes but they are all the same and representative of the grid structure and executors connections.

_images/visualize_grid.png

Graph visualization of a grid as seen from the Orchestrator. This grid instance consists of the Orchestrator along with 15 router, 35 manager and 160 worker.

class softgrid.Core.LaunchBase(*args, **kwargs)

Bases: softgrid.Core.AssessBase

In this class all remote and local launch definitons are implemented.

launch_executor(executorAddress, executorType, executorKwargs=None, upgridUniqueName=None, machineUser=None, machineExecutable=None, timeout=10, retry=3)

Launch and connected executor tothe grid.

Parameters
  1. executorAddress (string, list): address where executor will be launched. If list is given it must be a list if executor addresses

  2. executorType (string, list): Executor type to launch. If list is given, must have the same size as executorAddress and all items must be strings of executor type

  3. executorKwargs (None, dict, list): Executor instanciation keyword arguments. If None, all default keyword arguments will be used. If list is given, must have the same size as executorAddress

  4. upgridUniqueName (None, string, list): Launched router upgrid unique name. If None is given, it’s then replaced with current executor grid unique name. If list is given, must have the same size as executorAddress

  5. machineUser (string, list): Executor machine user name. If list is given, must have the same size as executorAddress

  6. machineExecutable (None, string, list): Python machine executable path or alias. If None is given, or a None item is given in the list then it’s replaced with registered grid machines executable path if machine is registered or simply ‘python’ alias.

  7. timeout (number): timeout for launching routers. If exceeded remaining not regiestered routers will forcefully fail to launch.

  8. retry (integer): the number of time to retry launching executor if failure happens

Returns
  1. status (Collections.SoftgridStatus): Status of success or failure

  2. successful (list): list of launch parameters dictionnaries of all successfully launched executors. Dictionaries keys are ‘executorType’, ‘upgridUniqueName’, ‘gridUniqueName’, ‘executorAddress’, ‘machineUser’ and ‘machineExecutable’

  3. failed (list): list of launch parameters dictionnaries of all executors that failed to launch. The same parameters can be used to relaunch the executors. Dictionaries keys are ‘executorType’, ‘upgridUniqueName’, ‘executorAddress’, ‘machineUser’ and ‘machineExecutable’

# standard library imports
from __future__ import print_function
import sys

# softgrid imports
from softgrid import Orchestrator, Globals

# create orchestrator
O = Orchestrator(password='nopassword', acceptWorkers=False)

## launch four local routers
status, routers,fl = O.launch_router(upgridUniqueName=O.gridUniqueName,
                                     executorAddress=[O.address]*4,
                                     executorKwargs=None,
                                     machineUser='some_user')
for s in status:
    print(s)
launch_router(*args, **kwargs)

alias to ‘launch_executor’ with executor type as ‘Router’

launch_manager(*args, **kwargs)

alias to ‘launch_executor’ with executor type as ‘Manager’

launch_worker(*args, **kwargs)

alias to ‘launch_executor’ with executor type as ‘Worker’

class softgrid.Core.ExecutorBase(name=None, applicationName='softgrid_application', address=None, port=5000, portFetch=True, password=<function ExecutorBase.<lambda>>, allowExecutingUDF=None, relaunch=True, acceptRouters=True, acceptManagers=True, acceptWorkers=True, temporaryDir=None, permanentDir=None, ddfsDir=None, logKwargs=None, *args, **kwargs)

Bases: softgrid.Core.LaunchBase

This is the former definition of all softgrid executors. This can be launched on a remote machine as well as on local. Orchestrator, Router, Manager and Worker implementations all inhert ExecutorBase.

Parameters
  1. name (None, string): grid executor name. If None, name will be set to the executor class name

  2. applicationName (string): Application name

  3. address (None, string): Machine address. If None, then it will get automatically assigned the ip address of the local machine

  4. port (integer): The port number of the object server

  5. portFetch (boolean): Whether to fetch for open port in case given port is not available

  6. password (callable, string): grid executor server communication password

  7. allowExecutingUDF (boolean): Whether to allow executing user defined function by this grid executor exxecution

  8. acceptRouters (boolean): Whehter to accept router downgrid

  9. acceptManagers (boolean): Whehter to accept manager downgrid

  10. acceptWorkers (boolean): Whehter to accept worker downgrid

  11. rules (None, dict): Machine default rules

  12. logKwargs (None, dict): Logger keyword arguments

property heartMonitorFrequency

Heart beats monitor checking frequency

property heartbeatTolerance

Heart beats monitor missing tolerance time

property relaunch

Get relaunch flag or integer value

property executorsTransport

Executors transport instance

property isOrchestrator

Whether this is an orchestrator

property isRouter

Whether this is an router

property isManager

Whether this is an manager

property isWorker

Whether this is an worker

property allowExecutingUDF

Whether It’s allowed to execute user defined functions

property machineProperties

Get machine properties dictionary

property softgridProperties

Get softgrid properties dictionary

property executorProperties

Get executor properties dictionary

property acceptRouters
property acceptManagers
property acceptWorkers
destroy_bridge(bridgeToGUN, bridgePassword)

Destroy connection bridge to bridgeToGUN

Parameters
  1. bridgeToGUN (string): Executor grid unique name to destroy connection bridge to.

  2. bridgePassword (string): Connection bridge password.

Returns
  1. status (Collections.SoftgridStatus): Status of success or failure

build_bridge(bridgeToGUN, bridgeFromGUN=None, timeout=10)

Build connection bridge from bridgeFromGUN to bridgeToGUN

Parameters
  1. bridgeToGUN (string): Grid unique name of the executor to build the connection bridge to.

  2. bridgeFromGUN (None, string): Grid unique name of the executor to build the connection bridge from.

  3. timeout (number): Timeout parameter to build the bridge

Returns
  1. status (Collections.SoftgridStatus): Status of success or failure

from __future__ import print_function
import sys
from softgrid import Collections, Orchestrator, Globals

## allow machine to accept any number of managers
Globals.RULES['machine_maximum_number_of_managers'] = None
## allow machine to accept any number of workers
Globals.RULES['machine_maximum_number_of_workers']  = None

## launch orchestrator
O = Orchestrator(password='nopassword')

## launch 3 managers attached to orchestrator
status,managers,_ = O.launch_manager(upgridUniqueName=O.gridUniqueName,
                                     executorAddress=[O.address]*3,
                                     machineUser='some_user',
                                     timeout=20)

# get managers gridUniqueName
managers = [e['gridUniqueName'] for e in managers]

## launch 5 workers on attached to each manager for a total of 15 workers
status,workers,_ = O.launch_worker(upgridUniqueName=managers*5,
                                   executorAddress=[O.address]*len(managers)*5,
                                   machineUser='some_user',
                                   timeout=20)

# get workers gridUniqueName
workers = [e['gridUniqueName'] for e in workers]

## build connection bridges between workers
O.build_bridge(O.workers[0], O.workers[14])
O.build_bridge(O.workers[2], O.workers[7])
O.build_bridge(O.workers[5], O.workers[12])
O.build_bridge(O.workers[8], O.workers[10])

## visualize grid graph as seen from orchestrator
O.visualize_grid(labelsFormat='${role}\n${pid}')
_images/build_bridge.png

Grid graph as seen from Orchestrator. Grid Executors are visualized as spheres color coded by role (red:orchestrator, blue:manager, yellow:worker). Grid connections are drawn with solid black line while connection bridges are drawn with the dotted red line. e.g. without the bridge, transfer from worker15748 to worker15753 will first go through manager15730 to orchestrator15727 to manager15729 before reaching destination. The bridge connection built between worker15748 and worker15753 shortcuts all intermediate grid executors. This is very useful when two executors are expected to exchange and transfer frequently

connect_to_grid(address, port, password, handshakeTrackingKey=None, blocking=False, timeout=20)

Connect executor to another on the grid. This will raise an error if this executor is already connected to the grid.

Parameters
  1. address (str): grid server machine address this grid executor will be connecting to

  2. port (int): server port

  3. password (str): server connection password

  4. blocking (bool): If True, infinite loop will be launched and this will block user interface. If False, infinite loop will be launched automatically if Globals.INTERACTIVE_MODE is True

  5. timeout (int, float): timeout in seconds to successfuly connect with grid. This is used only if Globals.INTERACTIVE_MODE is False and will result in killing the process

add_promise(creatorGUN, promisePassword=None, promiseTimeout=60, force=True)

Add a promise to scheduler execution queue. A promise is a place holder to receiving an execute transfer from a remove executor. If placed, a promise will block any other scheduled execution from running until all stacked executions in the promise queue are finished or promise timeout is exhausted. As long as the promise is alive, remote executors can transfer executions to the promise by simply using the promise password Multiple promises can be created but at the risk of exhausting timeout if another task or promise is running. It’s to be noted that promises have no priority and they are used on first come first served basis. But executions in a promise queue will be prioritised.

Parameters
  1. creatorGUN (str): Executor creator grid unique name

  2. promisePassword (None, str): Promise password. If None, a unique password will be automatically created.

  3. promiseTimeout (int, float): Promise timeout in the execution queue.

  4. force (bool): Place a promise regardless whether execution queue is running a task or not. If True, promise will be placed regardless at the risk of exhausting timeout constraint before running execution finishes.

Returns
  1. status (dict): The reservation status. Keys are:

    • ‘placed’: Whether the promise has been placed in the execution queue. If force argument is False and their is already an execution running then the promise will not be placed.

    • ‘queue_length’: The number of current execution stacked in the execution queue.

    • ‘number_of_existing_promises’: The number of already existing promises in the queue.

    • ‘number_of_running_executions’: The number of currently

      running executions.

    • ‘promise_create_time’: Promise create utc time. If not successfully placed, None is the value.

    • ‘promise_password’: In case promise is placed, this password must be used to transfer executions to this promise queue. If not successfully placed, None is the value.

remove_promise(promisePassword, force=False)

Remove a promise from the execution queue.

Parameters
  1. promisePassword (str): Promise password.

  2. force (str): Whether to force removing promise even if execution are remaining in promise queue.

Returns
  1. status (dict): Dictionary of remove status.

    • ‘removed’: Whether the promise has been successfully removed

    • ‘remaining_queue_length’: The number of executions

      stacked in the execution queue of the promise.

transfer_remove_promise(workerGUN, promisePassword, force=False)

Remove a promise from the execution queue of a remove worker.

Parameters
  1. workerGUN (string, list): Remote worker grid unique name or list of workers grid unique name

  2. promisePassword (str): Promise password.

  3. force (str): Whether to force removing promise even if execution are remaining in promise queue.

Returns
  1. status (SoftgridStatus, list): promise remove result. If an error occured SoftgridStatus will be returned. Otherwise removing status dictionary is returned:

    • ‘removed’: Whether the promise has been successfully removed

    • ‘remaining_queue_length’: The number of executions

      stacked in the execution queue of the promise.

transfer_add_promise(workerGUN, promisePassword=None, promiseTimeout=60, force=True)

Transfer a promise to execute to a remote worker.

Parameters
  1. workerGUN (list, string): Remote worker grid unique name to send promise to. It can be a list of workers too

  2. promisePassword (None, str): Promise password. If None, a unique password will be automatically created.

  3. promiseTimeout (int, float): Promise timeout in the execution queue.

  4. force (bool): Place a promise regardless whether execution queue is running a task or not. If True, promise will be placed regardless at the risk of exhausting promiseTimeout constraint before running execution finishes.

Returns
  1. status (SoftgridStatus, list): promise result status. List items can be SoftgridStatus if an error occured, otherwise reservation status dictionary:

    • ‘placed’: Whether the promise has been placed in the execution queue. If force argument is False and their is already an execution running then the promise will not be placed.

    • ‘queue_length’: The number of current execution stacked in the execution queue.

    • ‘number_of_existing_promises’: The number of already existing promises in the queue.

    • ‘number_of_running_executions’: The number of currently

      running executions.

    • ‘promise_create_time’: Promise create utc time. If not successfully placed, None is the value.

    • ‘promise_password’: In case promise is placed, this password must be used to transfer executions to this promise queue. If notsuccessfully placed, None is the value.

# standard library imports
import time, uuid, sys
from pprint import pprint

# softgrid imports
from softgrid import Orchestrator

# create some funky password
t = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime(time.time()))
s = str(uuid.uuid1())
PROMISE_PASSWORD = "%s_%s"%(t,s)

# create orchestrator
O = Orchestrator(password='some_password_12345', acceptWorkers=True)

# create local workers
ncpu = O.machineNumberOfCores - 1
status, ss,fl = O.launch_worker(upgridUniqueName=O.gridUniqueName,
                                executorAddress=[O.address]*ncpu,
                                executorKwargs=None,
                                machineUser=['some_user']*ncpu)
for s in status:
    print(s)

# create job execution promise on all workers
promises = O.transfer_add_promise(workerGUN=O.workers, promisePassword=PROMISE_PASSWORD, promiseTimeout=60, force=True)

# print returned promises
if not isinstance(promises, list):
    # failed to transfer at all
    print(promises)
else:
    # print failed promises
    for p in promises:
        if not isinstance(p, dict):
            print(p)
    # print successful promises
    for p in promises:
        if isinstance(p, dict):
            pprint(p)

# IN 1 MIN, ALL PROMISES WILL EXPIRE BECAUSE promiseTimeout IS SET TO 60.
# ORCHESTRATOR WILL RECEIVE A TIMEOUT LOG ABOUT PROMISES ON REMOTE WORKERS
# EXPIRING AFTER BEING IDLE FOR 60 SECONDS.
class softgrid.Core.TaskScheduler

Bases: object

Task scheduler is a seperate implementation that can be inherited by ExecutorBase children classes. It has all needed methods to schedule and launch tasks.

schedule_task(func, fargs=None, fkwargs=None, worker=None, priority=1, promisePassword=None, collect=True, returnResult=True, storeResult=False, resultName=None, taskTimeout=None, multiplier=9223372036854775807)

Schedule a task execute on a worker. When worker is busy with other scheduled and tasks, the executions will launch in a priority basis.

Parameters
  1. func (str, dict, callable): Function to execute.

  2. fargs (None, list): Function func list of arguments.

  3. fkwargs (None, list): Function func dictionary of keyword arguments.

  4. worker (None, str): Worker grid unique name. If None, a worker will be randomly picked.

  5. priority (int): Task priority in the scheduler queue.

  6. promisePassword (None, str): Promise password. If not None, the task will be sent to the promise queue instead of the main scheduler queue.

  7. collect (bool): Whether to collect and return execution result. This will block user interface until remote execution returns.

  8. returnResult (bool): Whether to return result to local excutor. In contrast with ‘collect’, returnResult will make sure executed result is returned to this executor and stored in executedReturned dictionary if resultName is not None otherwise logged.

  9. storeResult (bool): Whether to store result on remote executor executedStored dictionary.

  10. resultName (None, str): Result name to store in local and remote worker executor. If None, storeResult will be automatically set to False and returned result will be logged instead of stored.

  11. taskTimeout (None, number): Whether to restrain remote execution with a timeout. If set and expried, remote execution will continue but locally transfer tracking will be dropped.

  12. multiplier (number): timeout multiplier. Wait timeout is calculated as timeout * ceil( pathLength/multiplier )

Returns
# result (None, list): If collect is None, then None is returned,

otherwise a list of 1 item which is the transfer returned result.

create_job_promise(ntasks=None, workers=None, nworkersMin=None, promisePassword=None, promiseTimeout=60, transferTimeout=10, force=False, verbose=False)

This will add promises with the same password on multiple workers. A job is a collection of tasks. To ensure tasks and execution continuity or simultaneity, promises must be setup on multiple workers.

Parameters
  1. ntasks (None, integer): The number of tasks required to do the job. If not anticipated, or an unknown amount of tasks is expected then ntasks shall be kept None. If set to a number, ntasks is the maximum amount of promises created regardless of the number of workers available.

  2. workers (None, list): List of workers to create the promise on. If None, then all downgrid workers are candidated for creating a promise.

  3. nworkersMin (None, list): The minimum number of successful promises accepted.

  4. promisePassword (None, str): The promise password. If None, password will be created automatically, otherwise it must be a string. If any promise with the same password exist on a worker then setting a new promise on that worker will fail.

  5. promiseTimeout (number): The promise timeout in the execution queue.

  6. transferTimeout (None, number): Transfer promise timeout. This will be used to ensure all promises are set or fail to set within a timeout constraint. if None is given it will default to promiseTimeout.

  7. force (bool): Whether to force creating a promise regardless of the amount of executions and promises in the queue.

Retuns
  1. statuses (SoftgridStatus, list): the successful promises list. If creating the promises fail, SoftgridStatus is returned.

import random
ntasks   = 5
statuses = E.create_job_promise(ntasks=ntasks, workers=None, nworkersMin=ntasks, promiseTimeout=60)
if not isinstance(statuses, list):
    print(statuses)
else:
    def func(wait, n):
        import time, random
        tic = time.time()
        while time.time()-tic<wait:
            continue
        return [random.random() for _ in range(n)]

    for i in range(ntasks):
        password = statuses[i]['promise_password']
        worker   = statuses[i]['promise_executor']
        E.schedule_task(func=func, worker=worker, promisePassword=password,
                        fkwargs={'wait':30, 'n':random.randint(1000,10000)}, collect=False,
                        returnResult=True, storeResult=False, resultName='random_range')

        # After about 30 seconds results shall be returned from executors
        # and retrieved using E._executedReturned['random_range']
remove_job_promise(promises, force=True, verbose=True)

Remove job promise from executors.

Parameters
  1. promises (string, list, dict): promises password or a list of dictionaries promises statuses

  2. force (bool): Whether to force removing promise from executors regardless of existing executions in the promise queue.

  3. verbose (bool): Whether to be verbose

Returns
  1. results (list): List of removing promises SoftgridStatus statuses.

schedule_job(funcs, collect=True, resultName=None, jobTimeout=None, jobPromiseKwargs=None, verbose=True)

Schedule and launch an asynchronous job of multiple tasks.

Parameters
  1. funcs (string, dict, callable, list): List of tuples of three items where first item is a function and the second item is None or a list arguments and the third item is None or a dictionary of keyword arguments. Every tuple in the list is a task that will be be scheduled to run as an independant task. funcs can be also a string method name, a dictionary function source or a callable and this will be a single task.

  2. collect (bool): Whether to collect the data. This will block user’s execution until function returns the result.

  3. resultName (None, str): If given result will be stored in local executor _executedReturned dictionary.

  4. jobTimeout (None, number): Timeout to finish executing job

  5. jobPromiseKwargs (None, dict): create_job_promise method keyword arguments.

  6. verbose (bool): Whether to be verbose in the execution.

Returns
  1. values (None, list): List of collected values or None when collect is False.

get_executors_bridges(receivers=None, timeout=20, keepDuplicates=False)

get list of bridges pair tuples

Parameters
  1. receivers (None, string, list): string keyword or a valid executor grid unique name or a list of valid transfer trajectory of valid connected executors grid unique name or a list of a any combination of the listed before. Receivers list will be normalized and expanded whether keywords are used or not. Accepted string keywords are: * upgrid: current executor upgrid * routers: all current executor downgrid routers * managers: all current executor downgrid managers * workers: all current executor downgrid workers If None is given, receivers will be automatically set to all routers, managers and workers

  2. timeout (None, number): transfer result return timeout.

  3. keepDuplicates (boolean): whether remove duplicates in bridges coming from different executors.

Returns
  1. errors (list): list of Collections.SoftgridStatus

  2. bridges (dictionary): dictionary of bridges where keys are executors grid unique name and values are list of

softgrid.Executors

class softgrid.Executors.Worker(address=None, port=5001, relaunch=True, *args, **kwargs)

Bases: softgrid.Core.ExecutorBase

Worker executor implementation

property isWorker

Whether this is a worker executor. This will always return True

property role

This executor role, this will return ‘worker’

class softgrid.Executors.Manager(address=None, port=5000, relaunch=10, allowExecutingUDF=None, *args, **kwargs)

Bases: softgrid.Core.ExecutorBase

Manager executor implementation

property launchScripts

Helper property that prints launching script of downgrids for this Manager instance

property isManager

Whether this is a manager executor. This will always return True

property role

This executor role, this will return ‘manager’

class softgrid.Executors.Router(address=None, port=5001, relaunch=5, allowExecutingUDF=None, *args, **kwargs)

Bases: softgrid.Core.ExecutorBase

Router executor implementation

property launchScripts

Helper property that prints launching script of downgrids for this Router instance

property isRouter

Whether this is a router executor. This will always return True

property role

This executor role, this will return ‘router’

class softgrid.Executors.Orchestrator(address=None, port=5000, allowExecutingUDF=False, heartMonitorFrequency=10, heartbeatTolerance=60, acceptManagers=None, acceptWorkers=None, *args, **kwargs)

Bases: softgrid.Core.ExecutorBase, softgrid.Core.TaskScheduler

Orchestrator executor implementation

property launchScripts

Helper property that prints launching script of downgrids for this Router instance

property role

This executor role, this will return ‘orchestrator’

property isOrchestrator

Whether this is a orchestrator executor. This will always return True

property gridPath

overload to gridpath property that returns a list of item which is the orchestrator gridUniqueName

connect_to_grid(*args, **kwargs)

Orchestrator is the main node of the grid. It connects to nothing

kill_executor(receivers)

Kill executors

Parameters
  1. receivers (string, list): string keyword or a valid executor grid unique name or a list of valid transfer trajectory of valid connected executors grid unique name or a list of a any combination of the listed before. Receivers list will be normalized and expanded whether keywords are used or not. Accepted string keywords are: * upgrid: current executor upgrid * routers: all current executor downgrid routers * managers: all current executor downgrid managers * workers: all current executor downgrid workers

garbage_collect(receivers=None)

Send a garbage collect execution to executors

Parameters
  1. receivers (None, string, list): string keyword or a valid executor grid unique name or a list of valid transfer trajectory of valid connected executors grid unique name or a list of a any combination of the listed before. Receivers list will be normalized and expanded whether keywords are used or not. Accepted string keywords are: * upgrid: current executor upgrid * routers: all current executor downgrid routers * managers: all current executor downgrid managers * workers: all current executor downgrid workers If None is given, receivers will be automatically set to all routers, managers and workers

softgrid.DDFS

class softgrid.DDFS.DDFSETL(dtypes, sep='\x01', null=None, colNames=None)

Bases: object

Distributed Data File System Extract Transform Load

Parameters
  1. dtypes (dtype, list): data file columns data types (str,int,float,list,tuple,dict). If one data type is given then all columns are considered of this data type. If a list of data types is given, then this will define the data number of columns as well as every and each column specific type

  2. sep (str): Data seperator to dump or load from disk

  3. null (object): The designated null value. On disk it’s always and empty string.

dump(path, data, header=None, replace=True)

Dump data on disk.

Parameters
  1. path (str): Relative path to executor ddfs’s where the data must be stored.

  2. data (list, numpy.ndarray): list of rows or a numpy array for dimension 2.

  3. replace (boolean): Whether to replace existing file.

pull(path)

Pull and return data from Distributed Data File System

Parameters
  1. path (string): Relative path to executor ddfs’s where the data is stored.

Returns
  1. header (list): List of columns name.

  2. data (list): Data list of rows where each row is a list of columns.

iter(path, n=1, header=False)

Iterate file n lines at a time

Parameters
  1. path (string): Relative path to executor ddfs’s where the data is stored.

  2. n (integer): number of rows to return in each iteration. If n==1 then one row is returned. If n>1 then each iteration will return a list of rows.

  3. header (boolean): Whether to return header columns name in each iteration

Returns
  1. yielded (tuple, list): If header is True, yielded is a tuple of header columns name and n rows list. If header is False, then yielded will be a list of n rows or a single row depending on whether n is equal to 1 or bigger.

softgrid.Addons

softgrid addons that can be used to help to enable other libraries.

class softgrid.Addons.RepositoryTransfer

Bases: object

Static helper methods to transfer pyrep repositories across the grid

static get_repository_transfer_kwargs(localRepo, paths, remoteRepo, remoteRepoInfo=None, remoteRepoReplace=False, remoteRepoReplaceFiles=True)

Get transfer kwargs ready of pyrep repository files. This can be used to transfer and create pyrep repository on remote machines.

Parameters
  1. localRepo (Repository, string): pyrep repository instance or path to a valid instance.

  2. paths (string, list): List of files relative path in local repository to load files ready to transfer.

  3. remoteRepo (tuple, string): Path to repository on remote machine. If repository is not found under given path it will be created. If tuple is given, it must be of length 2 where first item is a keyword (‘temporary’,’permanent’) and the second item is relative path to given keyword.

  4. remoteRepoInfo (bool, object): Repository information this will be used in case remote repository doesn’t exist. It can be None or any json writable type of data. If None, localRepo info will be used.

  5. remoteRepoReplace (boolean): Whether to replace remote repository in case existing. If false and another repository exists then an error will be raised.

  6. remoteRepoReplaceFiles (boolean): Whether to replace remote existing repository files. If False and file exists, an error will be generated.

Returns
  1. get_repository_transfer_kwargs (dict): Transfer kwargs dictionary.

paths = [p[0] for p in repo.walk_files_info(relativePath='',fullPath=False, recursive=True)]
tkw   = Addons.RepositoryTransfer.get_repository_transfer_kwargs(localRepo=repo,
                                                                 remoteRepo='repo_path', remoteRepoInfo=True,
                                                                 remoteRepoReplace=True, paths=paths)
# transfer from orchestrator (O) to a manager
c = O.transfer(transferKwargs=tkw, receivers=O.managers[-1], collect=True)
print(c[0])
static dump(executor, repoPath, repoInfo, repoReplace, files, replaceFiles)

Dump given files dictionary to repository.

Parameters
  1. executor (softgrid Executor): Executor instance.

  2. repoPath (string): Repository path string. If pyrep repository is not found then it will be created.

  3. repoInfo (None, object): Repository information this will be used in case repository doesn’t exist. It can be None or any json writable type of data.

  4. repoReplace (boolean): Whether to replace repository in case existing. If false and another repository exists then an error will be raised.

  5. files (dict): Files dictionary of repository relative path where values are a dictionary of all of ‘pull’, ‘dump’ methods along with ‘data’ in a serialized format using pickle.dumps

  6. replaceFiles (boolean): Whether to replace existing file in repository if existing.

Returns
  1. status (softgrid.Collections.SoftgridStatus): SoftgridStatus status.