Source code for leads.comm.server.server
from threading import Thread as _Thread
from typing import override as _override
from leads.comm.prototype import Entity, Connection, Callback
from leads.os import _thread_flags
[docs]
class Server(Entity):
"""
You should use `create_server()` and `start_server()` instead of directly calling any method.
"""
def __init__(self, port: int, callback: Callback, separator: bytes) -> None:
"""
:param port: the port on which the server listens
:param callback: the callback interface
:param separator: the symbol that splits the stream into messages
"""
super().__init__(port, callback)
self._connections: list[Connection] = []
self._separator: bytes = separator
[docs]
def num_connections(self) -> int:
"""
Get the number of active connections.
:return: the number of connections
"""
return len(self._connections)
[docs]
def remove_connection(self, connection: Connection) -> None:
"""
Remove the connection from the list.
:param connection: the connection to remove
"""
try:
self._connections.remove(connection)
except ValueError:
pass
[docs]
@_override
def run(self, max_connection: int = 1) -> None:
"""
Start listening for the connections and stage each connection in a new thread.
:param max_connection: the maximum number of connections allowed at the same time
"""
self._socket.bind(("0.0.0.0", self._port))
self._socket.listen(max_connection)
self._callback.on_initialize(self)
while _thread_flags.active:
socket, address = self._socket.accept()
self._callback.on_connect(self, connection := Connection(socket, address, separator=self._separator,
on_close=lambda c: self.remove_connection(c)))
self._connections.append(connection)
_Thread(target=self._stage, args=(connection,), daemon=True).start()
[docs]
def broadcast(self, msg: bytes) -> None:
"""
Send the message to all connected clients.
:param msg: the message to send
"""
for c in self._connections:
try:
c.send(msg)
except IOError:
self.remove_connection(c)
[docs]
@_override
def close(self) -> None:
"""
Close all active connections.
"""
self._socket.close()
for connection in self._connections:
connection.close()