#
# Support for Unix file descriptors.
#
# Copyright (C) 2022 Red Hat, Inc. All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
# USA
#
import logging
from dasbus.constants import DBUS_FLAG_NONE
from dasbus.typing import VariantUnpacking, get_variant
from dasbus.client.handler import GLibClient
from dasbus.server.handler import GLibServer
import gi
gi.require_version("Gio", "2.0")
from gi.repository import Gio
log = logging.getLogger(__name__)
__all__ = [
"GLibClientUnix",
"GLibServerUnix",
]
def acquire_fds(variant):
"""Acquire Unix file descriptors contained in a variant.
Return a variant with indexes into a list of Unix file descriptors
and the list of Unix file descriptors.
If the variant is None, or the variant doesn't contain any Unix
file descriptors, return None instead of the list.
:param variant: a variant with Unix file descriptors
:return: a variant with indexes and a list of Unix file descriptors
"""
if variant is None:
return None, None
fd_list = []
def _get_idx(fd):
fd_list.append(fd)
return len(fd_list) - 1
variant_without_fds = UnixFDSwap.apply(variant, _get_idx)
if not fd_list:
return variant, None
return variant_without_fds, Gio.UnixFDList.new_from_array(fd_list)
def restore_fds(variant, fd_list: Gio.UnixFDList):
"""Restore Unix file descriptors in a variant.
If the variant is None, return None. Otherwise, return
a variant with Unix file descriptors.
:param variant: a variant with indexes into fd_list
:param fd_list: a list of Unix file descriptors
:return: a variant with Unix file descriptors
"""
if variant is None:
return None
if fd_list is None:
return variant
fd_list = fd_list.steal_fds()
if not fd_list:
return variant
def _get_fd(index):
try:
return fd_list[index]
except IndexError:
return -1
return UnixFDSwap.apply(variant, _get_fd)
class UnixFDSwap(VariantUnpacking):
"""Class for swapping values of the UnixFD type."""
@classmethod
def apply(cls, variant, swap):
"""Swap unix file descriptors with indices.
The provided function should swap a unix file
descriptor with an index into an array of unix
file descriptors or vice versa.
:param variant: a variant to modify
:param swap: a swapping function
:return: a modified variant
"""
return cls._recreate_variant(variant, swap)
@classmethod
def _handle_variant(cls, variant, *extras):
"""Handle a variant."""
return cls._recreate_variant(variant.get_variant(), *extras)
@classmethod
def _handle_value(cls, variant, *extras):
"""Handle a basic value."""
type_string = variant.get_type_string()
# Handle the unix file descriptor.
if type_string == 'h':
# Get the swapping function.
swap, *_ = extras
# Swap the values.
return swap(variant.get_handle())
return variant.unpack()
@classmethod
def _recreate_variant(cls, variant, *extras):
"""Create a variant with swapped values."""
type_string = variant.get_type_string()
# Do nothing if there is no unix file descriptor to handle.
if 'h' not in type_string and 'v' not in type_string:
return variant
# Get a new value of the variant.
value = cls._process_variant(variant, *extras)
# Create a new variant.
return get_variant(type_string, value)
[docs]class GLibClientUnix(GLibClient):
"""The low-level DBus client library based on GLib."""
[docs] @classmethod
def sync_call(cls, connection, service_name, object_path, interface_name,
method_name, parameters, reply_type, flags=DBUS_FLAG_NONE,
timeout=GLibClient.DBUS_TIMEOUT_NONE):
"""Synchronously call a DBus method.
:return: a result of the DBus call
"""
# Process Unix file descriptors in parameters.
parameters, fd_list = acquire_fds(parameters)
# Call the DBus method.
result = connection.call_with_unix_fd_list_sync(
service_name,
object_path,
interface_name,
method_name,
parameters,
reply_type,
flags,
timeout,
fd_list,
None
)
# Restore Unix file descriptors in the result.
return restore_fds(*result)
[docs] @classmethod
def async_call(cls, connection, service_name, object_path, interface_name,
method_name, parameters, reply_type, callback,
callback_args=(), flags=DBUS_FLAG_NONE,
timeout=GLibClient.DBUS_TIMEOUT_NONE):
"""Asynchronously call a DBus method."""
# Process Unix file descriptors in parameters.
parameters, fd_list = acquire_fds(parameters)
# Call the DBus method.
connection.call_with_unix_fd_list(
service_name,
object_path,
interface_name,
method_name,
parameters,
reply_type,
flags,
timeout,
fd_list,
callback=cls._async_call_finish,
user_data=(callback, callback_args)
)
@classmethod
def _async_call_finish(cls, source_object, result_object, user_data):
"""Finish an asynchronous DBus method call."""
# Prepare the user's callback.
callback, callback_args = user_data
def _finish_call():
# Retrieve the result of the call.
result = source_object.call_with_unix_fd_list_finish(
result_object
)
# Restore Unix file descriptors in the result.
return restore_fds(*result)
# Call user's callback.
callback(_finish_call, *callback_args)
[docs]class GLibServerUnix(GLibServer):
"""The low-level DBus server library based on GLib.
Adds Unix FD Support to base class"""
[docs] @classmethod
def emit_signal(cls, connection, object_path, interface_name,
signal_name, parameters, destination=None):
"""Emit a DBus signal.
GLib doesn't seem to support Unix file descriptors in signals.
Swap Unix file descriptors with indexes into a list of Unix file
descriptors, but emit just the indexes. Log a warning to inform
users about the limited support.
"""
# Process Unix file descriptors in parameters.
parameters, fd_list = acquire_fds(parameters)
if fd_list:
log.warning("Unix file descriptors in signals are unsupported.")
# Emit the signal without Unix file descriptors.
connection.emit_signal(
destination,
object_path,
interface_name,
signal_name,
parameters
)
[docs] @classmethod
def set_call_reply(cls, invocation, out_type, out_value):
"""Set the reply of the DBus call."""
# Process Unix file descriptors in the reply.
reply_value = cls._get_reply_value(out_type, out_value)
reply_args = acquire_fds(reply_value)
# Send the reply.
invocation.return_value_with_unix_fd_list(*reply_args)
@classmethod
def _object_callback(cls, connection, sender, object_path,
interface_name, method_name, parameters,
invocation, user_data):
"""A method call closure of a DBus object."""
# Prepare the user's callback.
callback, callback_args = user_data
# Restore Unix file descriptors in parameters.
fd_list = invocation.get_message().get_unix_fd_list()
parameters = restore_fds(parameters, fd_list)
# Call user's callback.
callback(
invocation,
interface_name,
method_name,
parameters,
*callback_args
)