Source code for jeepney.low_level

from enum import Enum, IntEnum
import struct

class SizeLimitError(ValueError):
    pass

class Endianness(Enum):
    little = 1
    big = 2

    def struct_code(self):
        return '<' if (self is Endianness.little) else '>'

    def dbus_code(self):
        return b'l' if (self is Endianness.little) else b'B'


endian_map = {b'l': Endianness.little, b'B': Endianness.big}


class MessageType(Enum):
    method_call = 1
    method_return = 2
    error = 3
    signal = 4


msg_type_map = {t.value: t for t in MessageType}

# Flags:
NO_REPLY_EXPECTED = 1
NO_AUTO_START = 2
ALLOW_INTERACTIVE_AUTHORIZATION = 4


class HeaderFields(IntEnum):
    path = 1
    interface = 2
    member = 3
    error_name = 4
    reply_serial = 5
    destination = 6
    sender = 7
    signature = 8
    unix_fds = 9

header_fields_map = {t.value: t for t in HeaderFields}

def padding(pos, step):
    pad = step - (pos % step)
    if pad == step:
        return 0
    return pad


class FixedType:
    def __init__(self, size, struct_code):
        self.size = self.alignment = size
        self.struct_code = struct_code

    def parse_data(self, buf, pos, endianness):
        pos += padding(pos, self.alignment)
        code = endianness.struct_code() + self.struct_code
        val = struct.unpack(code, buf[pos:pos + self.size])[0]
        return val, pos + self.size

    def serialise(self, data, pos, endianness):
        pad = b'\0' * padding(pos, self.alignment)
        code = endianness.struct_code() + self.struct_code
        return pad + struct.pack(code, data)

    def __repr__(self):
        return 'FixedType({!r}, {!r})'.format(self.size, self.struct_code)

    def __eq__(self, other):
        return (type(other) is FixedType) and (self.size == other.size) \
                and (self.struct_code == other.struct_code)


simple_types = {
    'y': FixedType(1, 'B'),  # unsigned 8 bit
    'b': FixedType(4, 'I'),  # bool
    'n': FixedType(2, 'h'),  # signed 16 bit
    'q': FixedType(2, 'H'),  # unsigned 16 bit
    'i': FixedType(4, 'i'),  # signed 32-bit
    'u': FixedType(4, 'I'),  # unsigned 32-bit
    'x': FixedType(8, 'q'),  # signed 64-bit
    't': FixedType(8, 'Q'),  # unsigned 64-bit
    'd': FixedType(8, 'd'),  # double
    'h': FixedType(8, 'I'),  # file descriptor (32-bit unsigned, index) TODO
}


class StringType:
    def __init__(self, length_type):
        self.length_type = length_type

    @property
    def alignment(self):
        return self.length_type.size

    def parse_data(self, buf, pos, endianness):
        length, pos = self.length_type.parse_data(buf, pos, endianness)
        end = pos + length
        val = buf[pos:end].decode('utf-8')
        assert buf[end:end + 1] == b'\0'
        return val, end + 1

    def serialise(self, data, pos, endianness):
        if not isinstance(data, str):
            raise TypeError("Expected str, not {!r}".format(data))
        encoded = data.encode('utf-8')
        len_data = self.length_type.serialise(len(encoded), pos, endianness)
        return len_data + encoded + b'\0'

    def __repr__(self):
        return 'StringType({!r})'.format(self.length_type)

    def __eq__(self, other):
        return (type(other) is StringType) \
               and (self.length_type == other.length_type)


simple_types.update({
    's': StringType(simple_types['u']),  # String
    'o': StringType(simple_types['u']),  # Object path
    'g': StringType(simple_types['y']),  # Signature
})


class Struct:
    alignment = 8

    def __init__(self, fields):
        if any(isinstance(f, DictEntry) for f in fields):
            raise TypeError("Found dict entry outside array")
        self.fields = fields

    def parse_data(self, buf, pos, endianness):
        pos += padding(pos, 8)
        res = []
        for field in self.fields:
            v, pos = field.parse_data(buf, pos, endianness)
            res.append(v)
        return tuple(res), pos

    def serialise(self, data, pos, endianness):
        if not isinstance(data, tuple):
            raise TypeError("Expected tuple, not {!r}".format(data))
        if len(data) != len(self.fields):
            raise ValueError("{} entries for {} fields".format(
                len(data), len(self.fields)
            ))
        pad = b'\0' * padding(pos, self.alignment)
        pos += len(pad)
        res_pieces = []
        for item, field in zip(data, self.fields):
            res_pieces.append(field.serialise(item, pos, endianness))
            pos += len(res_pieces[-1])
        return pad + b''.join(res_pieces)

    def __repr__(self):
        return "{}({!r})".format(type(self).__name__, self.fields)

    def __eq__(self, other):
        return (type(other) is type(self)) and (self.fields == other.fields)


class DictEntry(Struct):
    def __init__(self, fields):
        if len(fields) != 2:
            raise TypeError(
                "Dict entry must have 2 fields, not %d" % len(fields))
        if not isinstance(fields[0], (FixedType, StringType)):
            raise TypeError(
                "First field in dict entry must be simple type, not {}"
                .format(type(fields[0])))
        super().__init__(fields)

class Array:
    alignment = 4
    length_type = FixedType(4, 'I')

    def __init__(self, elt_type):
        self.elt_type = elt_type

    def parse_data(self, buf, pos, endianness):
        # print('Array start', pos)
        length, pos = self.length_type.parse_data(buf, pos, endianness)
        pos += padding(pos, self.elt_type.alignment)
        end = pos + length
        res = []
        while pos < end:
            # print('Array elem', pos)
            v, pos = self.elt_type.parse_data(buf, pos, endianness)
            res.append(v)
        return res, pos

    def serialise(self, data, pos, endianness):
        if isinstance(self.elt_type, DictEntry) and isinstance(data, dict):
            data = sorted(data.items())
        elif (self.elt_type == simple_types['y']) and isinstance(data, bytes):
            pass
        elif not isinstance(data, list):
            raise TypeError("Not suitable for array: {!r}".format(data))

        # Fail fast if we know in advance that the data is too big:
        if isinstance(self.elt_type, FixedType):
            if (self.elt_type.size * len(data)) > 2**26:
                raise SizeLimitError("Array size exceeds 64 MiB limit")

        pad1 = padding(pos, self.alignment)
        pos_after_length = pos + pad1 + 4
        pad2 = padding(pos_after_length, self.elt_type.alignment)
        data_pos = pos_after_length + pad2
        limit_pos = data_pos + 2**26
        chunks = []
        for item in data:
            chunks.append(self.elt_type.serialise(item, data_pos, endianness))
            data_pos += len(chunks[-1])
            if data_pos > limit_pos:
                raise SizeLimitError("Array size exceeds 64 MiB limit")
        buf = b''.join(chunks)
        len_data = self.length_type.serialise(len(buf), pos+pad1, endianness)
        pos += len(len_data)
        # print('Array ser: pad1={!r}, len_data={!r}, pad2={!r}, buf={!r}'.format(
        #       pad1, len_data, pad2, buf))
        return (b'\0' * pad1) + len_data + (b'\0' * pad2) + buf

    def __repr__(self):
        return 'Array({!r})'.format(self.elt_type)

    def __eq__(self, other):
        return (type(other) is Array) and (self.elt_type == other.elt_type)


class Variant:
    alignment = 1

    def parse_data(self, buf, pos, endianness):
        # print('variant', pos)
        sig, pos = simple_types['g'].parse_data(buf, pos, endianness)
        # print('variant sig:', repr(sig), pos)
        valtype = parse_signature(list(sig))
        val, pos = valtype.parse_data(buf, pos, endianness)
        # print('variant done', (sig, val), pos)
        return (sig, val), pos

    def serialise(self, data, pos, endianness):
        sig, data = data
        valtype = parse_signature(list(sig))
        sig_buf = simple_types['g'].serialise(sig, pos, endianness)
        return sig_buf + valtype.serialise(data, pos + len(sig_buf), endianness)

    def __repr__(self):
        return 'Variant()'

    def __eq__(self, other):
        return type(other) is Variant

def parse_signature(sig):
    """Parse a symbolic signature into objects.
    """
    # Based on http://norvig.com/lispy.html
    token = sig.pop(0)
    if token == 'a':
        return Array(parse_signature(sig))
    if token == 'v':
        return Variant()
    elif token == '(':
        fields = []
        while sig[0] != ')':
            fields.append(parse_signature(sig))
        sig.pop(0)  # )
        return Struct(fields)
    elif token == '{':
        de = []
        while sig[0] != '}':
            de.append(parse_signature(sig))
        sig.pop(0)  # }
        return DictEntry(de)
    elif token in ')}':
        raise ValueError('Unexpected end of struct')
    else:
        return simple_types[token]


def calc_msg_size(buf):
    endian, = struct.unpack('c', buf[:1])
    endian = endian_map[endian]
    body_length, = struct.unpack(endian.struct_code() + 'I', buf[4:8])
    fields_array_len, = struct.unpack(endian.struct_code() + 'I', buf[12:16])
    header_len = 16 + fields_array_len
    return header_len + padding(header_len, 8) + body_length


_header_fields_type = Array(Struct([simple_types['y'], Variant()]))


def parse_header_fields(buf, endianness):
    l, pos = _header_fields_type.parse_data(buf, 12, endianness)
    return {header_fields_map[k]: v[1] for (k, v) in l}, pos


header_field_codes = {
    1: 'o',
    2: 's',
    3: 's',
    4: 's',
    5: 'u',
    6: 's',
    7: 's',
    8: 'g',
    9: 'u',
}


def serialise_header_fields(d, endianness):
    l = [(i.value, (header_field_codes[i], v)) for (i, v) in sorted(d.items())]
    return _header_fields_type.serialise(l, 12, endianness)


class Header:
    def __init__(self, endianness, message_type, flags, protocol_version,
                 body_length, serial, fields):
        self.endianness = endianness
        self.message_type = message_type
        self.flags = flags
        self.protocol_version = protocol_version
        self.body_length = body_length
        self.serial = serial
        self.fields = fields

    def __repr__(self):
        return 'Header({!r}, {!r}, {!r}, {!r}, {!r}, {!r}, fields={!r})'.format(
            self.endianness, self.message_type, self.flags,
            self.protocol_version, self.body_length, self.serial, self.fields)

    def serialise(self):
        s = self.endianness.struct_code() + 'cBBBII'
        return struct.pack(s, self.endianness.dbus_code(),
                           self.message_type.value, self.flags,
                           self.protocol_version,
                           self.body_length, self.serial) \
                + serialise_header_fields(self.fields, self.endianness)

    @classmethod
    def from_buffer(cls, buf):
        endian, msgtype, flags, pv = struct.unpack('cBBB', buf[:4])
        endian = endian_map[endian]
        bodylen, serial = struct.unpack(endian.struct_code() + 'II', buf[4:12])
        fields, pos = parse_header_fields(buf, endian)
        return cls(endian, msg_type_map[msgtype], flags, pv, bodylen,
                   serial, fields), pos


[docs]class Message: """Object representing a DBus message. It's not normally necessary to construct this directly: use higher level functions and methods instead. """ def __init__(self, header, body): self.header = header self.body = body def __repr__(self): return "{}({!r}, {!r})".format(type(self).__name__, self.header, self.body) @classmethod def from_buffer(cls, buf): header, pos = Header.from_buffer(buf) body = () if HeaderFields.signature in header.fields: sig = header.fields[HeaderFields.signature] body_type = parse_signature(list('(%s)' % sig)) body = body_type.parse_data(buf, pos, header.endianness)[0] return cls(header, body)
[docs] def serialise(self): """Convert this message to bytes.""" endian = self.header.endianness if HeaderFields.signature in self.header.fields: sig = self.header.fields[HeaderFields.signature] body_type = parse_signature(list('(%s)' % sig)) body_buf = body_type.serialise(self.body, 0, endian) else: body_buf = b'' self.header.body_length = len(body_buf) header_buf = self.header.serialise() pad = b'\0' * padding(len(header_buf), 8) return header_buf + pad + body_buf
[docs]class Parser: """Parse DBus messages from a stream of incoming data. """ def __init__(self): self.buf = b'' self.next_msg_size = None
[docs] def feed(self, data): """Feed the parser newly read data. Returns a list of messages completed by the new data. """ self.buf += data return list(iter(self._read1, None))
def _read1(self): if self.next_msg_size is None: if len(self.buf) >= 16: self.next_msg_size = calc_msg_size(self.buf) nms = self.next_msg_size if (nms is not None) and len(self.buf) >= nms: raw_msg, self.buf = self.buf[:nms], self.buf[nms:] msg = Message.from_buffer(raw_msg) self.next_msg_size = None return msg