#
# Server support for DBus containers
#
# Copyright (C) 2019 Red Hat, Inc. All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
# USA
#
from dasbus.namespace import get_dbus_path
from dasbus.server.publishable import Publishable
from dasbus.typing import ObjPath, List
__all__ = [
"DBusContainerError",
"DBusContainer"
]
[docs]class DBusContainerError(Exception):
"""General exception for DBus container errors."""
pass
[docs]class DBusContainer(object):
"""The container of DBus objects.
A DBus container should be used to dynamically publish Publishable
objects within the same namespace. It generates a unique DBus path
for each object. It is able to resolve a DBus path into an object
and an object into a DBus path.
Example:
.. code-block:: python
# Create a container of tasks.
container = DBusContainer(
namespace=("my", "project"),
basename="Task",
message_bus=DBus
)
# Publish a task.
path = container.to_object_path(MyTask())
# Resolve an object path into a task.
task = container.from_object_path(path)
"""
def __init__(self, message_bus, namespace, basename=None):
"""Create a new container.
:param message_bus: a message bus
:param namespace: a sequence of names
:param basename: a string with the base name
"""
self._message_bus = message_bus
if basename:
namespace = (*namespace, basename)
self._namespace = namespace[:-1]
self._basename = namespace[-1]
self._container = {}
self._published = set()
self._counter = 0
[docs] def set_namespace(self, namespace):
"""Set the namespace.
All DBus objects from the container should use the same
namespace, so the namespace should be set up before any
of the DBus objects are published.
:param namespace: a sequence of names
"""
self._namespace = namespace
[docs] def from_object_path(self, object_path: ObjPath):
"""Convert a DBus path to a published object.
If no published object is found for the given DBus path,
raise DBusContainerError.
:param object_path: a DBus path
:return: a published object
"""
return self._find_object(object_path)
[docs] def to_object_path(self, obj) -> ObjPath:
"""Convert a publishable object to a DBus path.
If no DBus path is found for the given object, publish
the object on the container message bus with a unique
DBus path generated from the container namespace.
:param obj: a publishable object
:return: a DBus path
"""
if not isinstance(obj, Publishable):
raise TypeError(
"Type '{}' is not publishable.".format(type(obj).__name__)
)
if not self._is_object_published(obj):
self._publish_object(obj)
return self._find_object_path(obj)
[docs] def from_object_path_list(self, object_paths: List[ObjPath]):
"""Convert DBus paths to published objects.
:param object_paths: a list of DBus paths
:return: a list of published objects
"""
return list(map(self.from_object_path, object_paths))
[docs] def to_object_path_list(self, objects) -> List[ObjPath]:
"""Convert publishable objects to DBus paths.
:param objects: a list of publishable objects
:return: a list of DBus paths
"""
return list(map(self.to_object_path, objects))
def _is_object_published(self, obj):
"""Is the given object published?
:param obj: an object
:return: True if the object is published, otherwise False
"""
return id(obj) in self._published
def _publish_object(self, obj: Publishable):
"""Publish the given object.
:param obj: an object to publish
:return: an object path
"""
object_path = self._generate_object_path()
self._message_bus.publish_object(
object_path,
obj.for_publication()
)
self._container[object_path] = obj
self._published.add(id(obj))
return object_path
def _find_object_path(self, obj):
"""Find a DBus path of the object.
:param obj: a published object
:return: a DBus path
:raise: DBusContainerError if no object path is found
"""
for object_path, found_obj in self._container.items():
if found_obj is obj:
return object_path
raise DBusContainerError(
"No object path found."
)
def _find_object(self, object_path):
"""Find an object by its DBus path.
:param object_path: a DBus path
:return: a published object
:raise: DBusContainerError if no object is found
"""
if object_path in self._container:
return self._container[object_path]
raise DBusContainerError(
"Unknown object path '{}'.".format(object_path)
)
def _generate_object_path(self):
"""Generate a unique object path.
This method is not thread safe.
:return: a unique object path
"""
self._counter += 1
return get_dbus_path(
*self._namespace,
self._basename,
str(self._counter)
)