Skip to content

Network Flow

Bases: ComponentManager, Agent

Class that represents a network flow.

Source code in edge_sim_py/components/network_flow.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
class NetworkFlow(ComponentManager, Agent):
    """Class that represents a network flow."""

    # Class attributes that allow this class to use helper methods from the ComponentManager
    _instances = []
    _object_count = 0

    def __init__(
        self,
        obj_id: int = None,
        topology: object = None,
        status: str = "active",
        source: object = None,
        target: object = None,
        start: int = 0,
        path: list = [],
        data_to_transfer: int = 0,
        metadata: dict = {},
    ) -> object:
        """Creates a NetworkFlow object.

        Args:
            obj_id (int, optional): Object identifier. Defaults to None.
            topology (object, optional): Network topology. Defaults to None.
            status (int, optional): Flow status. Defaults to "active".
            source (object, optional): Node where the flow starts. Defaults to None.
            target (object, optional): Node where the flow ends. Defaults to None.
            start (int, optional): Time step in which the flow started. Defaults to 0.
            path (list, optional): Network path used to pass the flow over the list of network nodes. Defaults to [].
            data_to_transfer (int, optional): Amount of data transferred by the self. Defaults to 0.
            metadata (dict, optional): Custom flow metadata. Defaults to {}.

        Returns:
            object: Created NetworkFlow object.
        """
        # Adding the new object to the list of instances of its class
        self.__class__._instances.append(self)

        # Object's class instance ID
        self.__class__._object_count += 1
        if obj_id is None:
            obj_id = self.__class__._object_count
        self.id = obj_id

        # Reference to the network topology object
        self.topology = topology

        # Flow status. Valid options: "active" (default) and "finished"
        self.status = status

        # Network nodes and path used by the flow
        self.source = source
        self.target = target
        self.path = path

        # Network capacity available to the flow
        self.bandwidth = {}
        self.last_updated_bandwidth = {}

        # Temporal information about the flow
        self.start = start
        self.end = None

        # Amount of data transferred by the flow
        self.data_to_transfer = data_to_transfer

        # Custom flow metadata
        self.metadata = metadata

        # Adding a reference to the flow inside the network links that comprehend the "path" attribute
        for i in range(0, len(path) - 1):
            link = self.topology[path[i]][path[i + 1]]
            link["active_flows"].append(self)
            self.bandwidth[link["id"]] = None
            self.last_updated_bandwidth[link["id"]] = None

        # Model-specific attributes (defined inside the model's "initialize()" method)
        self.model = None
        self.unique_id = None

    def _to_dict(self) -> dict:
        """Method that overrides the way the object is formatted to JSON."

        Returns:
            dict: JSON-friendly representation of the object as a dictionary.
        """
        dictionary = {
            "id": self.id,
            "status": self.status,
            "nodes": [{"type": type(node).__name__, "id": node.id} for node in self.nodes],
            "path": self.path,
            "start": self.start,
            "end": self.end,
            "data_to_transfer": self.data_to_transfer,
            "bandwidth": self.bandwidth,
            "metadata": self.metadata,
        }
        return dictionary

    def collect(self) -> dict:
        """Method that collects a set of metrics for the object.

        Returns:
            metrics (dict): Object metrics.
        """
        bw = list(self.bandwidth.values())
        actual_bw = min(bw) if len([bw for bw in self.bandwidth.values() if bw == None]) == 0 else None

        if self.metadata["type"] == "layer":
            object_being_transferred = f"{str(self.metadata['object'])} ({self.metadata['object'].instruction})"
        else:
            object_being_transferred = str(self.metadata["object"])

        metrics = {
            "Instance ID": self.id,
            "Object being Transferred": object_being_transferred,
            "Object Type": self.metadata["type"],
            "Start": self.start,
            "End": self.end,
            "Source": self.source.id if self.source else None,
            "Target": self.target.id if self.target else None,
            "Path": [node.id for node in self.path],
            "Links Bandwidth": bw,
            "Actual Bandwidth": actual_bw,
            "Status": self.status,
            "Data to Transfer": self.data_to_transfer,
        }
        return metrics

    def step(self):
        """Method that executes the events involving the object at each time step."""
        if self.status == "active":
            # Updating the flow progress according to the available bandwidth
            if not any([bw == None for bw in self.bandwidth.values()]):
                self.data_to_transfer -= min(self.bandwidth.values())

            if self.data_to_transfer <= 0:
                # Updating the completed flow's properties
                self.data_to_transfer = 0

                # Storing the current step as when the flow ended
                self.end = self.model.schedule.steps + 1

                # Updating the flow status to "finished"
                self.status = "finished"

                # Releasing links used by the completed flow
                for i in range(0, len(self.path) - 1):
                    link = self.model.topology[self.path[i]][self.path[i + 1]]
                    link["active_flows"].remove(self)

                # When container layer flows finish: Adds the container layer to its target host
                if self.metadata["type"] == "layer":
                    # Removing the flow from its target host's download queue
                    self.target.download_queue.remove(self)

                    # Adding the layer to its target host
                    layer = self.metadata["object"]
                    layer.server = self.target
                    self.target.container_layers.append(layer)

                # When service state flows finish: change the service migration status
                elif self.metadata["type"] == "service_state":
                    service = self.metadata["object"]
                    service._Service__migrations[-1]["status"] = "finished"

__init__(obj_id=None, topology=None, status='active', source=None, target=None, start=0, path=[], data_to_transfer=0, metadata={})

Creates a NetworkFlow object.

Parameters:

Name Type Description Default
obj_id int

Object identifier. Defaults to None.

None
topology object

Network topology. Defaults to None.

None
status int

Flow status. Defaults to "active".

'active'
source object

Node where the flow starts. Defaults to None.

None
target object

Node where the flow ends. Defaults to None.

None
start int

Time step in which the flow started. Defaults to 0.

0
path list

Network path used to pass the flow over the list of network nodes. Defaults to [].

[]
data_to_transfer int

Amount of data transferred by the self. Defaults to 0.

0
metadata dict

Custom flow metadata. Defaults to {}.

{}

Returns:

Name Type Description
object object

Created NetworkFlow object.

Source code in edge_sim_py/components/network_flow.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def __init__(
    self,
    obj_id: int = None,
    topology: object = None,
    status: str = "active",
    source: object = None,
    target: object = None,
    start: int = 0,
    path: list = [],
    data_to_transfer: int = 0,
    metadata: dict = {},
) -> object:
    """Creates a NetworkFlow object.

    Args:
        obj_id (int, optional): Object identifier. Defaults to None.
        topology (object, optional): Network topology. Defaults to None.
        status (int, optional): Flow status. Defaults to "active".
        source (object, optional): Node where the flow starts. Defaults to None.
        target (object, optional): Node where the flow ends. Defaults to None.
        start (int, optional): Time step in which the flow started. Defaults to 0.
        path (list, optional): Network path used to pass the flow over the list of network nodes. Defaults to [].
        data_to_transfer (int, optional): Amount of data transferred by the self. Defaults to 0.
        metadata (dict, optional): Custom flow metadata. Defaults to {}.

    Returns:
        object: Created NetworkFlow object.
    """
    # Adding the new object to the list of instances of its class
    self.__class__._instances.append(self)

    # Object's class instance ID
    self.__class__._object_count += 1
    if obj_id is None:
        obj_id = self.__class__._object_count
    self.id = obj_id

    # Reference to the network topology object
    self.topology = topology

    # Flow status. Valid options: "active" (default) and "finished"
    self.status = status

    # Network nodes and path used by the flow
    self.source = source
    self.target = target
    self.path = path

    # Network capacity available to the flow
    self.bandwidth = {}
    self.last_updated_bandwidth = {}

    # Temporal information about the flow
    self.start = start
    self.end = None

    # Amount of data transferred by the flow
    self.data_to_transfer = data_to_transfer

    # Custom flow metadata
    self.metadata = metadata

    # Adding a reference to the flow inside the network links that comprehend the "path" attribute
    for i in range(0, len(path) - 1):
        link = self.topology[path[i]][path[i + 1]]
        link["active_flows"].append(self)
        self.bandwidth[link["id"]] = None
        self.last_updated_bandwidth[link["id"]] = None

    # Model-specific attributes (defined inside the model's "initialize()" method)
    self.model = None
    self.unique_id = None

collect()

Method that collects a set of metrics for the object.

Returns:

Name Type Description
metrics dict

Object metrics.

Source code in edge_sim_py/components/network_flow.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def collect(self) -> dict:
    """Method that collects a set of metrics for the object.

    Returns:
        metrics (dict): Object metrics.
    """
    bw = list(self.bandwidth.values())
    actual_bw = min(bw) if len([bw for bw in self.bandwidth.values() if bw == None]) == 0 else None

    if self.metadata["type"] == "layer":
        object_being_transferred = f"{str(self.metadata['object'])} ({self.metadata['object'].instruction})"
    else:
        object_being_transferred = str(self.metadata["object"])

    metrics = {
        "Instance ID": self.id,
        "Object being Transferred": object_being_transferred,
        "Object Type": self.metadata["type"],
        "Start": self.start,
        "End": self.end,
        "Source": self.source.id if self.source else None,
        "Target": self.target.id if self.target else None,
        "Path": [node.id for node in self.path],
        "Links Bandwidth": bw,
        "Actual Bandwidth": actual_bw,
        "Status": self.status,
        "Data to Transfer": self.data_to_transfer,
    }
    return metrics

step()

Method that executes the events involving the object at each time step.

Source code in edge_sim_py/components/network_flow.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def step(self):
    """Method that executes the events involving the object at each time step."""
    if self.status == "active":
        # Updating the flow progress according to the available bandwidth
        if not any([bw == None for bw in self.bandwidth.values()]):
            self.data_to_transfer -= min(self.bandwidth.values())

        if self.data_to_transfer <= 0:
            # Updating the completed flow's properties
            self.data_to_transfer = 0

            # Storing the current step as when the flow ended
            self.end = self.model.schedule.steps + 1

            # Updating the flow status to "finished"
            self.status = "finished"

            # Releasing links used by the completed flow
            for i in range(0, len(self.path) - 1):
                link = self.model.topology[self.path[i]][self.path[i + 1]]
                link["active_flows"].remove(self)

            # When container layer flows finish: Adds the container layer to its target host
            if self.metadata["type"] == "layer":
                # Removing the flow from its target host's download queue
                self.target.download_queue.remove(self)

                # Adding the layer to its target host
                layer = self.metadata["object"]
                layer.server = self.target
                self.target.container_layers.append(layer)

            # When service state flows finish: change the service migration status
            elif self.metadata["type"] == "service_state":
                service = self.metadata["object"]
                service._Service__migrations[-1]["status"] = "finished"