Source code for dasbus.unix

#
# Support for Unix file descriptors.
#
# Copyright (C) 2022  Red Hat, Inc.  All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
# USA
#
import logging

from dasbus.constants import DBUS_FLAG_NONE
from dasbus.typing import VariantUnpacking, get_variant
from dasbus.client.handler import GLibClient
from dasbus.server.handler import GLibServer

import gi
gi.require_version("Gio", "2.0")
from gi.repository import Gio

log = logging.getLogger(__name__)

__all__ = [
    "GLibClientUnix",
    "GLibServerUnix",
]


def acquire_fds(variant):
    """Acquire Unix file descriptors contained in a variant.

    Return a variant with indexes into a list of Unix file descriptors
    and the list of Unix file descriptors.

    If the variant is None, or the variant doesn't contain any Unix
    file descriptors, return None instead of the list.

    :param variant: a variant with Unix file descriptors
    :return: a variant with indexes and a list of Unix file descriptors
    """
    if variant is None:
        return None, None

    fd_list = []

    def _get_idx(fd):
        fd_list.append(fd)
        return len(fd_list) - 1

    variant_without_fds = UnixFDSwap.apply(variant, _get_idx)

    if not fd_list:
        return variant, None

    return variant_without_fds, Gio.UnixFDList.new_from_array(fd_list)


def restore_fds(variant, fd_list: Gio.UnixFDList):
    """Restore Unix file descriptors in a variant.

    If the variant is None, return None. Otherwise, return
    a variant with Unix file descriptors.

    :param variant: a variant with indexes into fd_list
    :param fd_list: a list of Unix file descriptors
    :return: a variant with Unix file descriptors
    """
    if variant is None:
        return None

    if fd_list is None:
        return variant

    fd_list = fd_list.steal_fds()

    if not fd_list:
        return variant

    def _get_fd(index):
        try:
            return fd_list[index]
        except IndexError:
            return -1

    return UnixFDSwap.apply(variant, _get_fd)


class UnixFDSwap(VariantUnpacking):
    """Class for swapping values of the UnixFD type."""

    @classmethod
    def apply(cls, variant, swap):
        """Swap unix file descriptors with indices.

        The provided function should swap a unix file
        descriptor with an index into an array of unix
        file descriptors or vice versa.

        :param variant: a variant to modify
        :param swap: a swapping function
        :return: a modified variant
        """
        return cls._recreate_variant(variant, swap)

    @classmethod
    def _handle_variant(cls, variant, *extras):
        """Handle a variant."""
        return cls._recreate_variant(variant.get_variant(), *extras)

    @classmethod
    def _handle_value(cls, variant, *extras):
        """Handle a basic value."""
        type_string = variant.get_type_string()

        # Handle the unix file descriptor.
        if type_string == 'h':
            # Get the swapping function.
            swap, *_ = extras
            # Swap the values.
            return swap(variant.get_handle())

        return variant.unpack()

    @classmethod
    def _recreate_variant(cls, variant, *extras):
        """Create a variant with swapped values."""
        type_string = variant.get_type_string()

        # Do nothing if there is no unix file descriptor to handle.
        if 'h' not in type_string and 'v' not in type_string:
            return variant

        # Get a new value of the variant.
        value = cls._process_variant(variant, *extras)

        # Create a new variant.
        return get_variant(type_string, value)


[docs]class GLibClientUnix(GLibClient): """The low-level DBus client library based on GLib."""
[docs] @classmethod def sync_call(cls, connection, service_name, object_path, interface_name, method_name, parameters, reply_type, flags=DBUS_FLAG_NONE, timeout=GLibClient.DBUS_TIMEOUT_NONE): """Synchronously call a DBus method. :return: a result of the DBus call """ # Process Unix file descriptors in parameters. parameters, fd_list = acquire_fds(parameters) # Call the DBus method. result = connection.call_with_unix_fd_list_sync( service_name, object_path, interface_name, method_name, parameters, reply_type, flags, timeout, fd_list, None ) # Restore Unix file descriptors in the result. return restore_fds(*result)
[docs] @classmethod def async_call(cls, connection, service_name, object_path, interface_name, method_name, parameters, reply_type, callback, callback_args=(), flags=DBUS_FLAG_NONE, timeout=GLibClient.DBUS_TIMEOUT_NONE): """Asynchronously call a DBus method.""" # Process Unix file descriptors in parameters. parameters, fd_list = acquire_fds(parameters) # Call the DBus method. connection.call_with_unix_fd_list( service_name, object_path, interface_name, method_name, parameters, reply_type, flags, timeout, fd_list, callback=cls._async_call_finish, user_data=(callback, callback_args) )
@classmethod def _async_call_finish(cls, source_object, result_object, user_data): """Finish an asynchronous DBus method call.""" # Prepare the user's callback. callback, callback_args = user_data def _finish_call(): # Retrieve the result of the call. result = source_object.call_with_unix_fd_list_finish( result_object ) # Restore Unix file descriptors in the result. return restore_fds(*result) # Call user's callback. callback(_finish_call, *callback_args)
[docs]class GLibServerUnix(GLibServer): """The low-level DBus server library based on GLib. Adds Unix FD Support to base class"""
[docs] @classmethod def emit_signal(cls, connection, object_path, interface_name, signal_name, parameters, destination=None): """Emit a DBus signal. GLib doesn't seem to support Unix file descriptors in signals. Swap Unix file descriptors with indexes into a list of Unix file descriptors, but emit just the indexes. Log a warning to inform users about the limited support. """ # Process Unix file descriptors in parameters. parameters, fd_list = acquire_fds(parameters) if fd_list: log.warning("Unix file descriptors in signals are unsupported.") # Emit the signal without Unix file descriptors. connection.emit_signal( destination, object_path, interface_name, signal_name, parameters )
[docs] @classmethod def set_call_reply(cls, invocation, out_type, out_value): """Set the reply of the DBus call.""" # Process Unix file descriptors in the reply. reply_value = cls._get_reply_value(out_type, out_value) reply_args = acquire_fds(reply_value) # Send the reply. invocation.return_value_with_unix_fd_list(*reply_args)
@classmethod def _object_callback(cls, connection, sender, object_path, interface_name, method_name, parameters, invocation, user_data): """A method call closure of a DBus object.""" # Prepare the user's callback. callback, callback_args = user_data # Restore Unix file descriptors in parameters. fd_list = invocation.get_message().get_unix_fd_list() parameters = restore_fds(parameters, fd_list) # Call user's callback. callback( invocation, interface_name, method_name, parameters, *callback_args )