---
test/wfd-source | 1334 +++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 1334 insertions(+)
create mode 100755 test/wfd-source
diff --git a/test/wfd-source b/test/wfd-source
new file mode 100755
index 00000000..7ac41342
--- /dev/null
+++ b/test/wfd-source
@@ -0,0 +1,1334 @@
+#! /usr/bin/python3
+#
+# Copyright (C) 2020 Intel Corporation
+#
+# A simplified WFD source that streams the X11 screen using gstreamer
+# A more complete solution would create a virtual screen visible through the normal
system calls, xrandr, etc.,
+# with its pixel aspect ratio, EDID data and what not. This would allow the user to
configure it like a real
+# display in mirror mode or side-by-side mode.
+
+import sys
+import dbus
+import dbus.mainloop.glib
+import socket
+import collections
+import collections.abc
+import random
+import dataclasses
+
+import gi
+gi.require_version('GLib', '2.0')
+gi.require_version('Gst', '1.0')
+gi.require_version('Gtk', '3.0')
+from gi.repository import GLib, Gst, Gtk, Gdk, Pango
+
+class WFDRTSPServer:
+ class RTSPException(Exception):
+ pass
+
+ def __init__(self, port, state_handler, error_handler):
+ # Should start the TCP server only on the P2P connection's local IP but we
won't
+ # know the IP or interface name until after the connection is established. At
that
+ # time the sink may try to make the TCP connection at any time so our listen
+ # socket should be up before this.
+ server_address = ('0.0.0.0', port)
+ self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+ self.server.bind(server_address)
+ self.server.listen(1)
+ GLib.io_add_watch(self.server, GLib.IO_IN, self.handle_connection)
+ self.conn = None
+ self.tx_queue = []
+ self.rx_queue = b''
+
+ self.state_handler = state_handler
+ self.error_handler = error_handler
+ self.sm_init()
+
+ def handle_data_out(self, conn, *args):
+ try:
+ cmd = self.tx_queue.pop(0)
+ sent = self.conn.send(cmd)
+
+ if sent < len(cmd):
+ self.tx_queue.insert(0, cmd[sent:])
+
+ return len(self.tx_queue) > 0
+ except Exception as e:
+ self.error_handler(e)
+ return False
+
+ def tx_queue_append(self, cmd):
+ if not self.tx_queue:
+ GLib.io_add_watch(self.conn.fileno(), GLib.IO_OUT, self.handle_data_out)
+
+ self.tx_queue.append(cmd.encode('utf-8'))
+ self.debug('queued cmd: ' + cmd)
+
+ def handle_data_hup(self, conn, *args):
+ try:
+ self.debug('HUP')
+ self.error('Disconnected')
+ except Exception as e:
+ self.error_handler(e)
+ return False
+
+ def handle_data_in(self, conn, *args):
+ try:
+ newdata = self.conn.recv(4096)
+ if len(newdata) == 0:
+ self.debug('recv returned 0 bytes')
+ # Disconnect from P2P
+ self.error('Disconnected')
+ return False
+
+ self.debug('received data: ' + str(newdata))
+ self.rx_queue += newdata
+
+ while b'\r\n\r\n' in self.rx_queue:
+ msg, content = self.rx_queue.split(b'\r\n\r\n', 1)
+ lines = msg.split(b'\r\n')
+
+ headers = {}
+ for line in lines[1:]:
+ if b':' not in line:
+ # Bad syntax
+ rxbuf = b''
+ return True
+
+ name, value = line.decode('utf8').split(':', 1)
+ name = name.lower()
+ while len(value) and value[0] == ' ':
+ value = value[1:]
+
+ if name in headers:
+ # Duplicate
+ rxbuf = b''
+ return True
+
+ headers[name] = value
+
+ cl = 0
+ if 'content-length' in headers:
+ try:
+ cl = int(headers['content-length'])
+ if cl < 1 or cl > 1000:
+ raise Exception('')
+ except:
+ # Bad syntax
+ rxbuf = b''
+ return True
+
+ if len(content) < cl:
+ # Wait for more data
+ return True
+
+ top_line = lines[0].decode('utf8').split(None, 2)
+ self.rx_queue = self.rx_queue[len(msg) + 4 + cl:]
+ content = content[:cl]
+
+ if top_line[2] == 'RTSP/1.0':
+ self.source_handle_message(method=top_line[0], target=top_line[1],
headers=headers, content=content)
+ elif top_line[0] == 'RTSP/1.0':
+ try:
+ status = int(top_line[1])
+ if status < 1 or status > 999:
+ raise Exception('Status out of range')
+ except:
+ self.error('Couldn\'t parse response status')
+
+ self.source_handle_message(status=status, reason=top_line[2],
headers=headers, content=content)
+ else:
+ # Bad protocol
+ self.error('Unknown protocol in ' + str(top_line))
+
+ return True
+ except Exception as e:
+ self.error_handler(e)
+ return False
+
+ def handle_connection(self, sock, *args):
+ try:
+ if self.conn:
+ return False
+ self.conn, addr = sock.accept()
+ self.debug('RTSP connection from: ' + str(addr))
+ self.remote_ip = addr[0]
+
+ if self.expected_remote_ip and self.remote_ip != self.expected_remote_ip:
+ self.conn.close()
+ self.conn = None
+ self.debug('Connection refused, bad source address')
+ return True
+
+ sock.close()
+ self.server = None
+ GLib.io_add_watch(self.conn.fileno(), GLib.IO_IN, self.handle_data_in)
+ GLib.io_add_watch(self.conn.fileno(), GLib.IO_HUP, self.handle_data_hup)
+
+ self._state = 'init'
+ self.source_handle_message()
+ return False
+ except Exception as e:
+ self.error_handler(e)
+ return False
+
+ def error(self, msg):
+ self.enter_state('failed')
+ e = WFDRTSPServer.RTSPException('State ' + self._state + ': ' +
msg)
+ self.debug('error: ' + msg)
+ raise e
+
+ def warning(self, msg):
+ self.debug('warning: ' + msg)
+ print('Warning: ' + msg + '\n')
+
+ def debug(self, msg):
+ pass
+
+ @property
+ def state(self):
+ return self._state
+
+ def enter_state(self, new_state):
+ self.debug('state change: ' + self._state + ' -> ' +
new_state)
+ self._state = new_state
+ self.state_handler()
+
+ @property
+ def ready(self):
+ return self._state in ['streaming', 'paused']
+
+ def sm_init(self):
+ self._state = 'waiting-rtsp'
+ self.local_params = {
+ 'wfd_audio_codecs': 'LPCM 00000003 00, AAC 00000001 00',
#example m3 resp, in m4 req we have 00000002 instead (and must send just one..???)
+ 'wfd_video_formats': '00 00 01 08 00000000 00000000 00000040 00
0000 0000 00 none none' # what libwds request the sink to set in M4
+ }
+ # 'wfd_video_formats': '00 00 01 01 00000001 00000000 00000000 00
0000 0000 00 none none' # example M3 resp and M4 req in the spec
+ # 'wfd_video_formats': '00 01 02 08 000194ab 00555555 00000fff 02
0000 00ff 11 0780 0438, 01 08 000194ab 00555555 00000fff 02 0000 00ff 11 0780 0438' #
what the beamer sink reports in M3
+ # 00 01 02 08 000194ab 00555555
00000fff 02 0000 00ff 11 0780 0438 ,
+ # 01 08 000194ab 00555555
00000fff 02 0000 00ff 11 0780 0438
(1920x1080)
+ # <res&fps-table-index> <supp-or-not> <profile>
<level> <CEAsupp> <VESAsupp> <HHsupp> <latency>
<min-slice> <slice-enc-prms> <fps-ctrl-supp> <max-hres>
<max-vres>
+ #IN M3: t37 01 t38 t39 t34 t35
t36 t33(5s) t40 t41
+ #IN M4 or if supp is 00: 00000000 00000000
00000000 none-ign none-ign
+ #
+ # res&fps-table-index:
+ # 00: CEA 640x480 p60 -- thats the value were always gonna use? hmm prefer
p30
+ # 01: VESA 800x600 p30
+
+ # min-slice: we dont support the slice encoding so always gonna set this to 00,
but recheck what our the h264 codec supports and how, and also limit this to 00 if sink
set it to 00
+
+ # fps-ctrl-supp: we dont support this so were always gonna send this as 00 in
M4
+ self.remote_params = {}
+ self.local_methods = [ 'org.wfa.wfd1.0', 'SET_PARAMETER',
'GET_PARAMETER', 'PLAY', 'SETUP', 'TEARDOWN' ]
+ self.presentation_url = [ 'rtsp://127.0.0.1/wfd1.0/streamid=0',
'none' ] # Table 88
+ self.session_stream_url = None
+ self.session_id = None
+ self.session_timeout = 60
+ self.local_cseq = 0
+ self.remote_cseq = None
+ self.last_method = None
+ self.last_require = []
+ self.last_params = []
+ self.remote_rtp_port = None
+ self.remote_rtcp_port = None
+ self.local_rtp_port = None
+ self.local_rtcp_port = None
+ self.use_tcp = None
+ self.rtp_pipeline = None
+ self.rtsp_keepalive = None
+ self.rtsp_keepalive_timeout = None
+ self.expected_remote_ip = None
+ self.remote_ip = None
+
+ def close(self):
+ # Avoid passing self to io watches so that the refcount can ever reach 0 and
+ # all this can be done in __del__
+ if self.rtsp_keepalive:
+ GLib.source_remove(self.rtsp_keepalive)
+ self.rtsp_keepalive = None
+ if self.rtsp_keepalive_timeout:
+ GLib.source_remove(self.rtsp_keepalive_timeout)
+ self.rtsp_keepalive_timeout = None
+ if self.rtp_pipeline:
+ self.rtp_pipeline.set_state(Gst.State.NULL)
+ self.rtp_pipeline = None
+ if self.server:
+ self.server.close()
+ self.server = None
+ if self.conn:
+ self.conn.close()
+ self.conn = None
+
+ def set_local_interface(self, new_value):
+ pass
+
+ def set_remote_ip(self, new_value):
+ self.expected_remote_ip = new_value
+
+ if self.conn and self.remote_ip != self.expected_remote_ip:
+ self.error_handler(WFDRTSPServer.RTSPException('Connection was from a
wrong IP')) # TODO: do this in an idle cb
+
+ def validate_msg(self, method, expected_method, status, reason, headers, target,
content):
+ if expected_method is None:
+ # Expected a response, not a request
+ if method is not None:
+ self.error('Received a "' + method + '" request
where a response was expected')
+ if status < 200 or status > 299:
+ self.error('Response status ' + str(status) + ' and reason:
' + reason)
+ if status != 200:
+ self.warning('Response status was ' + str(status) + '
("' + reason + '") in state ' + self._state)
+
+ try:
+ if int(headers['cseq']) != self.local_cseq:
+ self.error('Response CSeq doesn\'t match')
+ except:
+ self.error('Missing or unparsable CSeq in a response')
+
+ if self.last_method == 'OPTIONS':
+ if 'public' not in headers:
+ self.error('Missing "Public" header in OPTIONS
response')
+ public = [ m.strip() for m in
headers['public'].split(',') ]
+ missing = [ m for m in self.last_require if m not in public ]
+ if missing:
+ self.error('Missing required method(s) "' + '",
"'.join(missing) + '" in OPTIONS response')
+
+ if self.last_method == 'GET_PARAMETER':
+ params = {}
+ for line in content.split(b'\r\n'):
+ if b':' not in line:
+ continue
+ k, v = line.decode('utf8').split(':', 1)
+ if k.strip() in params:
+ self.error('Duplicate key "' + k + '" in
GET_PARAMETER response')
+ params[k.strip()] = v.strip()
+ missing = [ p for p in self.last_params if p not in params ]
+ if missing: # Not an error
+ self.warning('Missing key(s) "' + '",
"'.join(missing) + '" in GET_PARAMETER response')
+ self.remote_params.update(params)
+
+ return
+
+ if method is None:
+ self.error('Received an RTSP response where a ' + expected_method +
' was expected')
+
+ if method != expected_method:
+ self.error('Received a "' + method + '" request where a
' + expected_method + ' was expected')
+ try:
+ if self.remote_cseq is not None and int(headers['cseq']) <=
self.remote_cseq:
+ self.error('Unchanged CSeq in a new request')
+ self.remote_cseq = int(headers['cseq'])
+ except:
+ self.error('Missing or unparsable CSeq in a new request')
+ if method == 'OPTIONS' and 'require' not in headers:
+ self.error('Missing "Require" header in OPTIONS request')
+ elif method == 'SETUP' and 'transport' not in headers:
+ self.error('Missing "Transport" header in SETUP request')
+ elif method == 'SETUP' and (target not in self.presentation_url or target
== 'none'):
+ self.error('Unknown target "' + target + '" in SETUP
request')
+ elif method == 'PLAY' and ('session' not in headers or
headers['session'] != self.session_id):
+ self.error('Missing or invalid "Session" header in PLAY
request')
+ elif method == 'PLAY' and target != self.session_stream_url:
+ self.error('Unknown target "' + target + '" in PLAY
request')
+ elif method == 'PAUSE' and 'session' not in headers:
+ self.error('Missing "Session" header in PAUSE request')
+ elif method == 'PAUSE' and target != self.session_stream_url:
+ self.error('Unknown target "' + target + '" in PAUSE
request')
+ elif method == 'TEARDOWN' and 'session' not in headers:
+ self.error('Missing "Session" header in TEARDOWN request')
+ elif method == 'TEARDOWN' and target != self.session_stream_url:
+ self.error('Unknown target "' + target + '" in TEARDOWN
request')
+ elif method == 'SET_PARAMETER':
+ pass
+
+ def request(self, method, target, require=[], params=[]):
+ content = ''
+ cmd = method + ' ' + target + ' RTSP/1.0\r\n'
+
+ self.local_cseq += 1
+ cmd += 'CSeq: ' + str(self.local_cseq) + '\r\n'
+
+ if require:
+ cmd += 'Require: ' + ', '.join(require) + '\r\n'
+
+ if params:
+ if isinstance(params, collections.abc.Mapping):
+ content = ''.join([ k + ': ' + params[k] + '\r\n'
for k in params ])
+ else:
+ content = ''.join([ k + '\r\n' for k in params ])
+ content_type = 'text/parameters'
+
+ if content:
+ cmd += 'Content-Type: ' + content_type + '\r\n'
+ cmd += 'Content-Length: ' + str(len(content)) + '\r\n'
+
+ cmd += '\r\n'
+ self.tx_queue_append(cmd + content)
+ self.last_method = method
+ self.last_require = require
+ self.last_params = params
+
+ def response(self, public=[], session=None, transport=None):
+ cmd = 'RTSP/1.0 200 OK\r\n'
+
+ cmd += 'CSeq: ' + str(self.remote_cseq) + '\r\n'
+
+ if public:
+ cmd += 'Public: ' + ', '.join(public) + '\r\n'
+ if session is not None:
+ cmd += 'Session: ' + session + '\r\n'
+ if transport is not None:
+ cmd += 'Transport: ' + transport + '\r\n'
+
+ cmd += '\r\n'
+ self.tx_queue_append(cmd)
+
+ def parse_video_formats(self, value):
+ # TODO
+ pass
+
+ def parse_client_rtp_ports(self, value):
+ profile, rtp_p0_str, rtp_p1_str, mode = value.split()
+ try:
+ rtp_p0 = int(rtp_p0_str)
+ rtp_p1 = int(rtp_p1_str)
+ except:
+ self.error('Can\'t parse rtp-port in wfd-client-rtp-ports: ' +
value)
+ if rtp_p0 < 1 or rtp_p0 > 65535:
+ self.error('rtp-port0 not valid for Primary Sink: ' + rtp_p0_str)
+ if rtp_p1 != 0: # Table 90
+ self.error('rtp-port1 not valid for Primary Sink: ' + rtp_p1_str)
+ if profile not in ['RTP/AVP/UDP;unicast',
'RTP/AVP/TCP;unicast']:
+ self.error('Unknown RTP transport in wfd-client-rtp-ports: ' +
profile)
+ if mode != 'mode=play':
+ self.error('Unknown mode in wfd-client-rtp-ports: ' + mode)
+ self.remote_rtp_port = rtp_p0
+ self.use_tcp = (profile == 'RTP/AVP/TCP;unicast')
+
+ def parse_transport(self, value):
+ params = value.split(';')
+ if len(params) < 3:
+ self.error('Can\'t split SETUP Transport header into profile and port
numbers: ' + value)
+ profile = ';'.join(params[0:2])
+ if profile not in ['RTP/AVP/UDP;unicast',
'RTP/AVP/TCP;unicast']:
+ self.error('Unknown RTP transport in SETUP Transport header: ' +
profile)
+ if self.use_tcp != (profile == 'RTP/AVP/TCP;unicast'):
+ self.error('RTP transport in SETUP Transport header different from what
we sent in M4: ' + profile)
+ client_port_strs = [p for p in params[2:] if
p.startswith('client_port=')]
+ if len(client_port_strs) != 1:
+ self.error('Can\'t find client-port in SETUP Transport header: '
+ value)
+ client_ports = client_port_strs[0].split('=', 1)[1].split('-')
+ try:
+ rtp_port = int(client_ports[0])
+ if len(client_ports) > 1:
+ rtcp_port = int(client_ports[1])
+ except:
+ self.error('Can\'t parse client-port in SETUP Transport header: '
+ client_port_strs[0])
+ if rtp_port != self.remote_rtp_port:
+ self.error('client-port in SETUP Transport header doesn\'t match what
we sent in M4: ' + str(rtp_port))
+ if len(client_ports) > 1:
+ if rtcp_port < 1 or rtcp_port > 65535 or rtcp_port == rtp_port: #
Actually must be rtp_port + 1...
+ self.error('Optional RTCP port not valid in SETUP Transport header:
' + str(rtcp_port))
+ self.remote_rtcp_port = rtcp_port
+
+ def on_gst_message(self, bus, message):
+ t = message.type
+ if t == Gst.MessageType.EOS:
+ self.error('Gstreamer end-of-stream')
+ elif t == Gst.MessageType.STATE_CHANGED:
+ old, new, pending = message.parse_state_changed()
+ self.debug('Gstreamer state change for ' + message.src.name + '
from ' + str(old) + ' to ' + str(new) + ', pending=' + str(pending))
+ elif t == Gst.MessageType.INFO:
+ err, debug = message.parse_info()
+ self.debug('Gstreamer info for ' + message.src.name + ': ' +
str(err) + '\nDebug: ' + str(debug))
+ elif t == Gst.MessageType.WARNING:
+ err, debug = message.parse_warning()
+ self.debug('Gstreamer warning for ' + message.src.name + ': '
+ str(err) + '\nDebug: ' + str(debug))
+ elif t == Gst.MessageType.ERROR:
+ err, debug = message.parse_error()
+ self.error('Gstreamer error for ' + message.src.name + ': ' +
str(err) + '\nDebug: ' + str(debug))
+ else:
+ self.debug('Gstreamer message of type ' + str(t) + ' for ' +
message.src.name + ': ' + str(message))
+ return True
+
+ def gst_force_keyframe(self):
+ enc = self.rtp_pipeline.get_by_name('videnc')
+ sink = enc.get_static_pad('sink')
+ timestamp = Gst.CLOCK_TIME_NONE # can/should we use sink.query_position?
+
+ s = Gst.Structure('GstForceKeyUnit')
+ s.set_value('timestamp', timestamp, 'uint64')
+ s.set_value('stream-time', timestamp, 'uint64')
+ s.set_value('all-headers', True)
+ # TODO: can we also send this event directly to the element instead of the pad?
+ sink.send_event(Gst.event_new_custom(Gst.EVENT_CUSTOM_DOWNSTREAM, s))
+
+ def rtsp_keepalive_timeout_cb(self):
+ try:
+ self.rtsp_keepalive_timeout = None
+ self.error('Keep-alive response timed out')
+ except Exception as e:
+ self.error_handler(e)
+ return False
+
+ def rtsp_keepalive_cb(self):
+ try:
+ # Send M16
+ # May need to start being careful with other requests that may be running...
+ self.request('GET_PARAMETER', 'rtsp://localhost/wfd1.0')
+ self.rtsp_keepalive_timeout = GLib.timeout_add_seconds(5,
self.rtsp_keepalive_timeout_cb)
+ return True
+ except Exception as e:
+ self.error_handler(e)
+ return False
+
+ def source_handle_message(self, method=None, target=None, status=None, reason=None,
headers={}, content=None):
+ # TODO: check the 6s timeouts as per Section 6.5
+ # Source side M1-M8 simplified state machine
+ if self._state == 'init':
+ # Send M1
+ self.request('OPTIONS', '*',
require=['org.wfa.wfd1.0'])
+ self.enter_state('M1')
+ elif self._state == 'M1':
+ # Validate M1 response
+ self.validate_msg(method, None, status, reason, headers, None, content)
+ methods = [ m.strip() for m in headers['public'].split(',')
]
+ required = [ 'org.wfa.wfd1.0', 'SET_PARAMETER',
'GET_PARAMETER' ]
+ missing = [ m for m in required if m not in methods ]
+ if missing:
+ self.error('Missing required method(s) "' + '",
"'.join(missing) + '" in OPTIONS response')
+ self.enter_state('M2')
+ elif self._state == 'M2':
+ # Validate M2
+ self.validate_msg(method, 'OPTIONS', status, reason, headers, target,
content)
+ if target not in [ '*' ] + self.presentation_url:
+ self.error('Unknown OPTIONS target "' + target +
'"')
+ required = [ m.strip() for m in headers['require'].split(',')
]
+ missing = [ m for m in required if m not in self.local_methods ]
+ if missing:
+ self.error('Required methods in OPTIONS request that we don\'t
support: ' + ','.join(missing))
+
+ # Send M2 response
+ self.response(public=self.local_methods)
+ # Send M3
+ self.request('GET_PARAMETER', 'rtsp://localhost/wfd1.0',
params=['wfd_audio_codecs', 'wfd_video_formats',
'wfd_client_rtp_ports', 'wfd_display_edid',
'wfd_uibc_capability'])
+ self.enter_state('M3')
+ elif self._state == 'M3':
+ # Validate M3 response
+ self.validate_msg(method, None, status, reason, headers, None, content)
+ if 'wfd_video_formats' not in self.remote_params or
'wfd_client_rtp_ports' not in self.remote_params:
+ self.error('Required parameters missing from GET_PARAMETER
response')
+ self.parse_video_formats(self.remote_params['wfd_video_formats'])
+
self.parse_client_rtp_ports(self.remote_params['wfd_client_rtp_ports'])
+ # Send M4
+ params = {
+ 'wfd_video_formats':
self.local_params['wfd_video_formats'],
+ 'wfd_client_rtp_ports':
self.remote_params['wfd_client_rtp_ports'],
+ 'wfd_presentation_URL': self.presentation_url[0] + ' ' +
self.presentation_url[1],
+ # TODO: include wfd_audio_codecs if audio present, make video optional,
too
+ # TODO: support wfd2_video_formats and wfd_audio_codecs
+ }
+ self.request('SET_PARAMETER', 'rtsp://localhost/wfd1.0',
params=params)
+ self.enter_state('M4')
+ elif self._state == 'M4':
+ # Validate M4 response
+ self.validate_msg(method, None, status, reason, headers, None, content)
+ # Send M5
+ self.request('SET_PARAMETER', 'rtsp://localhost/wfd1.0',
params={'wfd_trigger_method': 'SETUP'})
+ self.enter_state('M5')
+ elif self._state == 'M5':
+ # Validate M5 response
+ self.validate_msg(method, None, status, reason, headers, None, content)
+ self.enter_state('M6')
+ elif self._state == 'M6':
+ # Validate M6
+ self.validate_msg(method, 'SETUP', status, reason, headers, target,
content)
+ self.parse_transport(headers['transport'])
+ self.session_stream_url = target
+ self.session_id = str(random.randint(a=1, b=999999))
+ self.local_rtp_port = random.randint(a=20000, b=30000)
+ if self.remote_rtcp_port is not None:
+ self.local_rtcp_port = self.local_rtp_port + 1
+ profile ='RTP/AVP/TCP;unicast' if self.use_tcp else
'RTP/AVP/UDP;unicast'
+ client_port = str(self.remote_rtp_port) + (('-' +
str(self.remote_rtcp_port)) if self.remote_rtcp_port is not None else '')
+ server_port = str(self.local_rtp_port) + (('-' +
str(self.local_rtcp_port)) if self.local_rtcp_port is not None else '')
+ transport = profile + ';client_port' + client_port +
';server_port=' + server_port
+ # Section B.1
+ pipeline = ('ximagesrc name=src use-damage=false do-timestamp=true !
capsfilter name=fps caps=video/x-raw,framerate=10/1' +
+ ' ! videoscale method=0 ! capsfilter name=res
caps=video/x-raw,width=800,height=600' +
+ ' ! videoconvert ! video/x-raw,format=I420 ! x264enc tune=zerolatency
speed-preset=ultrafast name=videnc' +
+ ' ! queue' + # TODO: add leaky=downstream
+ ' ! mpegtsmux name=mux' +
+ ' ! rtpmp2tpay pt=33 mtu=1472 ! .send_rtp_sink rtpsession
name=session .send_rtp_src' +
+ ' ! udpsink host=' + self.remote_ip + ' port=' +
str(self.remote_rtp_port) + ' bind-port=' + str(self.local_rtp_port)) # TODO:
bind-address
+
+ if self.local_rtcp_port is not None:
+ pipeline += ' session.send_rtcp_src ! udpsink name=rtcp_sink
host=' + self.remote_ip + \
+ ' port=' + str(self.remote_rtcp_port) + ' bind-port='
+ str(self.local_rtcp_port) # TODO: bind-address
+
+ self.rtp_pipeline = Gst.parse_launch(pipeline)
+ bus = self.rtp_pipeline.get_bus()
+ bus.enable_sync_message_emission()
+ bus.add_signal_watch()
+ bus.connect('sync-message', self.on_gst_message)
+
+ # Send M6 response
+ self.response(session=self.session_id + ';timeout=' +
str(self.session_timeout), transport=transport)
+ self.enter_state('M7')
+ elif self._state in ['M7', 'paused']:
+ # Validate M7
+ self.validate_msg(method, 'PLAY', status, reason, headers, target,
content)
+ # Send M7 response
+ self.response()
+ self.rtp_pipeline.set_state(Gst.State.PLAYING)
+ # Set up the keep-alive timer, interval must be less than timeout minus 5
seconds
+ self.rtsp_keepalive = GLib.timeout_add_seconds(self.session_timeout - 10,
self.rtsp_keepalive_cb)
+ self.enter_state('streaming')
+ elif self._state == 'streaming':
+ if method is None:
+ if self.rtsp_keepalive_timeout:
+ # The M16 response is not to be validated (Section 6.4.16)
+ GLib.source_remove(self.rtsp_keepalive_timeout)
+ self.rtsp_keepalive_timeout = None
+ return
+ self.error('Received an RTSP response where a request was
expected')
+ if method == 'PAUSE':
+ self.validate_msg(method, 'PAUSE', status, reason, headers,
target, content)
+ self.rtp_pipeline.set_state(Gst.State.PAUSED)
+ self.enter_state('paused')
+ self.response()
+ return
+ if method == 'SET_PARAMETER':
+ # TODO: parse the stuff, on 'wfd-idr-request\r\n' (no semicolon)
call the following:
+ self.gst_force_keyframe()
+ self.response()
+ return
+ if method == 'TEARDOWN':
+ # The spec suggests a more graceful teardown but we just close the
connection
+ self.error('Teardown requested')
+ self.error('Unsupported method "' + method + '"')
+
+WIPHY_IF = 'net.connman.iwd.Adapter'
+DEVICE_IF = 'net.connman.iwd.p2p.Device'
+PEER_IF = 'net.connman.iwd.p2p.Peer'
+WSC_IF = 'net.connman.iwd.SimpleConfiguration'
+WFD_IF = 'net.connman.iwd.p2p.Display'
+SVC_MGR_IF = 'net.connman.iwd.p2p.ServiceManager'
+
+class WFDSource(Gtk.Window):
+ @dataclasses.dataclass
+ class Device:
+ props: dict
+ dev_proxy: dbus.Interface
+ props_proxy: dbus.Interface
+ peers: dict
+ sorted_peers: list
+ widget: Gtk.Widget
+ expanded: bool
+ scan_request: bool
+ selected_peer: object
+ connecting_peer: object
+ disconnecting_peer: object
+ connected: list
+ dbus_call: dbus.lowlevel.PendingCall
+
+ @dataclasses.dataclass
+ class Peer:
+ peer_proxy: dbus.Interface
+ wfd_proxy: dbus.Interface
+ wsc_proxy: dbus.Interface
+ widget: Gtk.Widget
+ rtsp: WFDRTSPServer
+
+ def __init__(self):
+ Gtk.Window.__init__(self, type=Gtk.WindowType.TOPLEVEL, title='WFD
Source')
+ self.set_decorated(True)
+ self.set_resizable(False)
+ self.connect('destroy', self.on_destroy, "WM destroy")
+ self.set_size_request(900, 300)
+ self.device_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
+ leftscroll = Gtk.ScrolledWindow(hscrollbar_policy=Gtk.PolicyType.NEVER)
+ leftscroll.add(self.device_box)
+ self.infolabel1 = Gtk.Label()
+ self.infolabel1.set_ellipsize(Pango.EllipsizeMode.START)
+ infopane = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
+ infopane.pack_start(self.infolabel1, False, False, padding=10)
+ rightscroll = Gtk.ScrolledWindow(hscrollbar_policy=Gtk.PolicyType.NEVER,
vscrollbar_policy=Gtk.PolicyType.NEVER)
+ rightscroll.add(infopane)
+ paned = Gtk.Paned(orientation=Gtk.Orientation.HORIZONTAL)
+ paned.pack1(leftscroll, True, True)
+ paned.pack2(rightscroll, False, False)
+ paned.set_wide_handle(True)
+ paned.props.position = 400
+ paned.props.position_set = True
+ self.add(paned)
+ self.show_all()
+ self.connect('notify::is-active', self.on_notify_is_active)
+
+ self.rtsp_port = 7236
+ self.devices = None
+ self.objects = {}
+ self.dbus = dbus.SystemBus()
+ self.dbus.watch_name_owner('net.connman.iwd', self.on_name_owner_change)
+ self.on_name_owner_change('dummy' if
self.dbus.name_has_owner('net.connman.iwd') else '')
+
+ def on_name_owner_change(self, new_name):
+ if not new_name:
+ if self.devices is None:
+ return True
+
+ for dev_path in self.devices:
+ device = self.devices[dev_path]
+ if device.connecting_peer or device.disconnecting_peer:
+ device.dbus_call.cancel()
+
+ for peer_path in device.peers:
+ peer = device.peers[peer_path]
+ if peer.rtsp:
+ peer.rtsp.close()
+
+ self.devices = None
+ self.objects = {}
+ self.populate_devices()
+ self.dbus.remove_signal_receiver(self.on_properties_changed)
+ self.dbus.remove_signal_receiver(self.on_interfaces_added)
+ self.dbus.remove_signal_receiver(self.on_interfaces_removed)
+ return True
+
+ if self.devices is not None:
+ return True
+
+ manager = dbus.Interface(self.dbus.get_object('net.connman.iwd',
'/'), 'org.freedesktop.DBus.ObjectManager')
+ self.devices = {}
+ self.objects = manager.GetManagedObjects()
+
+ for path in self.objects:
+ if DEVICE_IF in self.objects[path]:
+ self.add_dev(path)
+ for path in self.objects:
+ if PEER_IF in self.objects[path]:
+ self.add_peer(path)
+
+ self.populate_devices()
+
+ self.dbus.add_signal_receiver(self.on_properties_changed,
+ bus_name="net.connman.iwd",
+ dbus_interface="org.freedesktop.DBus.Properties",
+ signal_name="PropertiesChanged",
+ path_keyword="path")
+ self.dbus.add_signal_receiver(self.on_interfaces_added,
+ bus_name="net.connman.iwd",
+ dbus_interface="org.freedesktop.DBus.ObjectManager",
+ signal_name="InterfacesAdded")
+ self.dbus.add_signal_receiver(self.on_interfaces_removed,
+ bus_name="net.connman.iwd",
+ dbus_interface="org.freedesktop.DBus.ObjectManager",
+ signal_name="InterfacesRemoved")
+
+ svc_mgr = dbus.Interface(self.dbus.get_object('net.connman.iwd',
'/net/connman/iwd'), SVC_MGR_IF)
+ svc_mgr.RegisterDisplayService({
+ 'Source': True,
+ 'Port': dbus.UInt16(self.rtsp_port)
+ })
+
+ return True
+
+ def add_dev(self, path):
+ obj_proxy = self.dbus.get_object('net.connman.iwd', path)
+ # Default to expanded for first device found
+ expanded = len(self.devices) == 0
+ self.devices[path] = WFDSource.Device(
+ props=self.objects[path][DEVICE_IF],
+ dev_proxy=dbus.Interface(obj_proxy, DEVICE_IF),
+ props_proxy=dbus.Interface(obj_proxy,
'org.freedesktop.DBus.Properties'),
+ peers={},
+ sorted_peers=[],
+ widget=None,
+ expanded=expanded,
+ scan_request=False,
+ selected_peer=None,
+ connecting_peer=None,
+ disconnecting_peer=None,
+ connected=[],
+ dbus_call=None)
+
+ def add_peer(self, path):
+ dev_path = self.objects[path][PEER_IF]['Device']
+ if dev_path not in self.devices or path in self.devices[dev_path].peers:
+ return False
+
+ self.devices[dev_path].peers[path] = WFDSource.Peer(
+ peer_proxy=None,
+ wfd_proxy=None,
+ wsc_proxy=None,
+ widget=None,
+ rtsp=None)
+ return True
+
+ def on_properties_changed(self, interface, changed, invalidated, path):
+ if path not in self.objects:
+ self.objects[path] = {}
+ if interface not in self.objects[path]:
+ self.objects[path][interface] = {}
+
+ self.objects[path][interface].update(changed)
+ for prop in invalidated:
+ if prop in self.objects[path][interface]:
+ del self.objects[path][interface][prop]
+
+ if path in self.devices:
+ self.update_dev_props(path)
+ if interface == DEVICE_IF and 'AvailableConnections' in changed:
+ self.update_selected_peer(path)
+
+ if PEER_IF in self.objects[path]:
+ dev_path = self.objects[path][PEER_IF]['Device']
+ if dev_path in self.devices:
+ device = self.devices[dev_path]
+ if path in device.peers:
+ peer = device.peers[path]
+ if interface == PEER_IF and 'Connected' in changed:
+ if changed['Connected'] and peer not in
device.connected:
+ device.connected.append(peer)
+ elif not changed['Connected'] and peer in
device.connected:
+ device.connected.remove(peer)
+ self.update_dev_props(dev_path)
+ self.update_peer_props(dev_path, path)
+ if peer != device.selected_peer:
+ self.update_selected_peer(dev_path)
+ if interface == PEER_IF and peer.rtsp:
+ if 'ConnectedInterface' in changed:
+
peer.rtsp.set_local_interface(changed['ConnectedInterface'])
+ if 'ConnectedIp' in changed:
+ peer.rtsp.set_remote_ip(changed['ConnectedIp'])
+
+ self.update_peer_props(dev_path, path)
+
+ return True
+
+ def on_interfaces_added(self, path, interfaces):
+ if path not in self.objects:
+ self.objects[path] = {}
+ self.objects[path].update(interfaces)
+
+ if DEVICE_IF in interfaces:
+ self.add_dev(path)
+ # This should happen rarely enough that we can repopulate the whole list
+ self.populate_devices()
+
+ update_dev_props = False
+ if PEER_IF in interfaces:
+ update_dev_props = self.add_peer(path)
+
+ if PEER_IF in self.objects[path]:
+ dev_path = self.objects[path][PEER_IF]['Device']
+ if dev_path in self.devices:
+ if update_dev_props:
+ # Update device's peer count
+ self.update_dev_props(dev_path)
+ self.update_peer_props(dev_path, path)
+
+ def on_interfaces_removed(self, path, interfaces):
+ if path not in self.objects:
+ return
+
+ dev_path = None
+ if PEER_IF in interfaces or WFD_IF in interfaces or WSC_IF in interfaces:
+ if PEER_IF in self.objects[path]:
+ dev_path = self.objects[path][PEER_IF]['Device']
+
+ for i in interfaces:
+ if i in self.objects[path]:
+ del self.objects[path][i]
+ if len(self.objects[path]) == 0:
+ del self.objects[path]
+
+ if DEVICE_IF in interfaces and path in self.devices:
+ device = self.devices[path]
+ if device.connecting_peer or device.disconnecting_peer:
+ device.dbus_call.cancel()
+ # TODO: check if connected
+ del self.devices[path]
+ # This should happen rarely enough that we can repopulate the whole list
+ self.populate_devices()
+
+ if dev_path is not None and dev_path in self.devices:
+ device = self.devices[dev_path]
+ if path in device.peers:
+ # Make sure the widget is removed
+ self.update_peer_props(dev_path, path)
+ if PEER_IF in interfaces:
+ del device.peers[path]
+ # Update device's peer count
+ self.update_dev_props(dev_path)
+
+ def populate_devices(self):
+ self.device_box.foreach(lambda x, y: self.device_box.remove(x), None)
+
+ if self.devices is None:
+ label = Gtk.Label(label="Not connected to IWD")
+ self.device_box.pack_start(label, expand=True, fill=True, padding=0)
+ self.device_box.show_all()
+ return
+
+ if len(self.devices) == 0:
+ label = Gtk.Label(label="No P2P-capable adapters :-(")
+ self.device_box.pack_start(label, expand=True, fill=True, padding=0)
+ self.device_box.show_all()
+ return
+
+ for path in self.devices:
+ label = Gtk.Label()
+ label.set_halign(Gtk.Align.START)
+ label.set_line_wrap(False)
+ label.set_single_line_mode(False)
+ label.set_ellipsize(Pango.EllipsizeMode.END)
+ switch = Gtk.Switch()
+ switch.connect('state-set', self.on_dev_enabled, path)
+ switch.set_halign(Gtk.Align.END)
+ switch.set_valign(Gtk.Align.START)
+ box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
+ box.pack_start(label, expand=True, fill=True, padding=0)
+ box.pack_end(switch, expand=False, fill=False, padding=0)
+ peer_list = Gtk.ListBox() # can also use an IconView.. or make it switchable
+ peer_list.set_size_request(150, 120)
+ peer_list.set_selection_mode(Gtk.SelectionMode.SINGLE)
+ peer_list.set_placeholder(Gtk.Label(label='No Wi-Fi Displays discovered
yet...'))
+ peer_list.connect('row-selected', self.on_peer_selected, path)
+ frame = Gtk.Frame()
+ frame.props.margin = 10
+ frame.add(peer_list)
+ expander = Gtk.Expander()
+ expander.set_label_fill(True)
+ expander.set_expanded(self.devices[path].expanded)
+ expander.set_label_widget(box)
+ expander.add(frame)
+ expander.connect('notify::expanded', self.on_dev_expanded, path)
+ expander.show_all()
+ self.device_box.add(expander)
+ self.devices[path].widget = expander
+ self.update_dev_props(path)
+ GLib.idle_add(self.expander_workaround, expander)
+
+ for peer_path in self.devices[path].peers:
+ self.update_peer_props(path, peer_path)
+
+ # Basically implement Gtk.Expander's set_label_fill which for some
reason
+ # doesn't do anything. Use size-allocate because configure-event
doesn't work either...
+ self.margin_left = None
+ def on_exp_resize(widget, event):
+ if self.margin_left is None:
+ self.margin_left = box.get_allocation().x
+ posx, posy = expander.translate_coordinates(self, 0, 0)
+ # Add posx to force the label widget (box) to be aligned to the left side
of the
+ # window even if GTK already decided to push the expander off the left
side with a
+ # negative allocation.x. This way it won't push it any further left
as the available
+ # space shrinks.
+ box.set_size_request(max(posx + expander.get_allocated_width() -
self.margin_left - 1, 0), -1)
+ return False
+ expander.connect('size-allocate', on_exp_resize)
+
+ def expander_workaround(self, widget):
+ box = widget.get_label_widget()
+ widget.set_label_widget(None)
+ widget.set_label_widget(box)
+ return False
+
+ def update_dev_props(self, path):
+ device = self.devices[path]
+ if not device.props['Enabled']:
+ state = 'disabled'
+ elif device.disconnecting_peer is not None:
+ state = 'disconnecting...'
+ elif device.connecting_peer is not None:
+ state = 'connecting...'
+ elif len(device.connected) > 0:
+ if all([not peer.rtsp or peer.rtsp.ready for peer in device.connected]):
+ state = 'connected'
+ else:
+ state = 'negotiating...'
+ elif device.scan_request:
+ state = 'discovering... (' + str(len(device.peers)) + ')'
+ else:
+ state = 'idle'
+
+ label, switch = device.widget.get_label_widget().get_children()
+ dev_str = self.get_dev_string(path)
+ name = str(device.props['Name'])
+ label.set_markup(dev_str + '\n<small>' + ('Local name: ' +
name + '\n' if dev_str != name else '') + 'State: ' + state +
'</small>')
+ switch.set_active(device.props['Enabled'])
+
+ def update_peer_props(self, dev_path, path):
+ device = self.devices[dev_path]
+ peer = device.peers[path]
+ props = self.objects[path] if path in self.objects else {}
+ peer_list = device.widget.get_child().get_child()
+ if peer.widget is None:
+ if PEER_IF not in props or WFD_IF not in props or WSC_IF not in props:
+ return
+ if not props[WFD_IF]['Sink']:
+ return
+
+ name = str(props[PEER_IF]['Name'])
+ device.sorted_peers.append(name)
+ device.sorted_peers.sort()
+ index = device.sorted_peers.index(name)
+
+ obj_proxy = self.dbus.get_object('net.connman.iwd', path)
+ peer.peer_proxy=dbus.Interface(obj_proxy, PEER_IF)
+ peer.wfd_proxy=dbus.Interface(obj_proxy, WFD_IF)
+ peer.wsc_proxy=dbus.Interface(obj_proxy, WSC_IF)
+ label = Gtk.Label()
+ label.set_halign(Gtk.Align.START)
+ label.set_single_line_mode(True)
+ label.set_ellipsize(Pango.EllipsizeMode.END)
+ event_box = Gtk.EventBox()
+ event_box.add(label)
+ event_box.connect('button-press-event', self.on_peer_click,
(dev_path, path))
+ button = Gtk.Button()
+ button.set_use_stock(True)
+ button.connect('clicked', self.on_peer_button, (dev_path, path))
+ box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
+ box.props.margin = 5;
+ box.pack_start(event_box, expand=True, fill=True, padding=0)
+ box.pack_end(button, expand=False, fill=False, padding=0)
+ peer.widget = Gtk.ListBoxRow()
+ peer.widget.add(box)
+ peer_list.insert(peer.widget, index)
+ peer.widget.show_all()
+ elif (PEER_IF not in props or WFD_IF not in props or WSC_IF not in props or not
props[WFD_IF]['Sink']) and peer.widget:
+ del device.sorted_peers[peer.widget.get_index()]
+ peer_list.remove(peer.widget)
+ if peer == device.selected_peer:
+ device.selected_peer = None
+ self.update_info(dev_path, None)
+ if peer == device.connecting_peer:
+ device.dbus_call.cancel()
+ device.connecting_peer = None
+ self.update_selected_peer(dev_path)
+ if peer == device.disconnecting_peer:
+ device.dbus_call.cancel()
+ device.disconnecting_peer = None
+ self.update_selected_peer(dev_path)
+ if peer in device.connected:
+ device.connected.remove(peer)
+ self.update_selected_peer(dev_path)
+ peer.peer_proxy = None
+ peer.wfd_proxy = None
+ peer.wsc_proxy = None
+ peer.widget = None
+ if peer.rtsp:
+ peer.rtsp.close()
+ peer.rtsp = None
+ return
+
+ subcat = 'unknown type'
+ if 'DeviceSubcategory' in props[PEER_IF]:
+ subcat = props[PEER_IF]['DeviceSubcategory']
+
+ weight = 'heavy' if peer in device.connected else 'normal'
+ box = peer.widget.get_child()
+ event_box, button = box.get_children()
+ label, = event_box.get_children()
+ label.set_markup('<span weight="' + weight + '">'
+ props[PEER_IF]['Name'] + '</span> <span foreground="grey"
size="small">' + subcat + '</span>')
+
+ if device.disconnecting_peer or (device.connecting_peer and peer !=
device.connecting_peer):
+ # This peer's row should not have any buttons
+ button.hide()
+ elif peer == device.connecting_peer:
+ button.set_label('Cancel')
+ button.show()
+ elif peer in device.connected:
+ if not peer.rtsp or peer.rtsp.ready:
+ button.set_label('Disconnect')
+ else:
+ button.set_label('Cancel')
+ button.show()
+ elif peer == device.selected_peer and
device.props['AvailableConnections'] > 0:
+ button.set_label('Connect')
+ button.show()
+ else:
+ button.hide()
+
+ if peer == device.selected_peer:
+ self.update_info(dev_path, path)
+
+ def update_selected_peer(self, dev_path):
+ device = self.devices[dev_path]
+ if device.selected_peer:
+ sel_path = self.get_peer_path(device, device.selected_peer)
+ self.update_peer_props(dev_path, sel_path)
+
+ def update_info(self, dev_path, path):
+ device = self.devices[dev_path]
+ if path is None:
+ self.infolabel1.set_text('')
+ return
+
+ peer = device.peers[path]
+
+ if peer == device.connecting_peer:
+ state = 'IWD connecting'
+ elif peer == device.disconnecting_peer:
+ state = 'disconnecting'
+ elif peer in device.connected:
+ if peer.rtsp is not None:
+ if peer.rtsp.ready:
+ state = peer.rtsp.state
+ else:
+ state = 'RTSP negotiation: ' + peer.rtsp.state
+ else:
+ state = 'connected'
+ else:
+ state = 'not connected'
+
+ subcat = 'unknown'
+ if 'DeviceSubcategory' in self.objects[path][PEER_IF]:
+ subcat = self.objects[path][PEER_IF]['DeviceSubcategory']
+
+ text = ('Connection state: ' + state + '\n' +
+ 'Device category: ' +
self.objects[path][PEER_IF]['DeviceCategory'] + '\n'
+ 'Device subcategory: ' + subcat + '\n')
+
+ if WFD_IF in self.objects[path]:
+ if self.objects[path][WFD_IF]['Source']:
+ if self.objects[path][WFD_IF]['Sink']:
+ t = 'dual-role'
+ else:
+ t = 'source'
+ else:
+ t = 'sink'
+ text += 'WFD device type: ' + t + '\n'
+
+ if self.objects[path][WFD_IF]['Sink']:
+ text += 'Audio: ' + ('yes' if
self.objects[path][WFD_IF]['HasAudio'] else 'no') + '\n'
+
+ text += 'UIBC: ' + ('yes' if
self.objects[path][WFD_IF]['HasUIBC'] else 'no') + '\n'
+
+ text += 'Content protection: ' + ('yes' if
self.objects[path][WFD_IF]['HasContentProtection'] else 'no') +
'\n'
+
+ self.infolabel1.set_text(text)
+ # TODO: more info in labels 2 and so on
+
+ # Direct method calls on dbus.Interface's don't return
dbus.lowlevel.PendingCall objects so
+ # we have to use bus.call_async to make cancellable async calls
+ def async_call(self, proxy, method, signature='', *args, **kwargs):
+ return self.dbus.call_async(proxy.bus_name, proxy.object_path,
proxy.dbus_interface, method, signature, args, **kwargs)
+
+ def connect_peer(self, dev_path, path):
+ device = self.devices[dev_path]
+ peer = device.peers[path]
+
+ def on_reply():
+ device.connected.append(peer)
+ device.connecting_peer = None
+ # Local interface and remote IP get set in the PropertiesChanged handler
+ self.update_dev_props(dev_path)
+ self.update_peer_props(dev_path, path)
+ if peer != device.selected_peer:
+ self.update_selected_peer(dev_path)
+
+ def on_error(excp):
+ device.connecting_peer = None
+ if peer.rtsp:
+ peer.rtsp.close()
+ peer.rtsp = None
+ self.update_dev_props(dev_path)
+ self.update_peer_props(dev_path, path)
+ if peer != device.selected_peer:
+ self.update_selected_peer(dev_path)
+ dialog = Gtk.MessageDialog(parent=self, message_type=Gtk.MessageType.ERROR,
buttons=Gtk.ButtonsType.OK, text='Connection failed')
+ dialog.format_secondary_text('Connection to ' +
self.objects[path][PEER_IF]['Name'] + ' failed: ' + repr(excp))
+ dialog.show()
+
+ def on_ok(response, *args):
+ dialog.destroy()
+
+ dialog.connect('response', on_ok)
+
+ def on_rtsp_state():
+ self.update_dev_props(dev_path)
+ self.update_peer_props(dev_path, path)
+ if peer != device.selected_peer:
+ self.update_selected_peer(dev_path)
+
+ def on_rtsp_error(excp):
+ self.disconnect_peer(dev_path, path)
+ dialog = Gtk.MessageDialog(parent=self, message_type=Gtk.MessageType.ERROR,
buttons=Gtk.ButtonsType.OK, text='Negotiation failed')
+ dialog.format_secondary_text('RTSP error when talking to ' +
self.objects[path][PEER_IF]['Name'] + ': ' + repr(excp))
+ dialog.show()
+
+ def on_ok(response, *args):
+ dialog.destroy()
+
+ dialog.connect('response', on_ok)
+
+ # Cannot use peer.wsc_proxy.PushButton()
+ device.dbus_call = self.async_call(peer.wsc_proxy, 'PushButton',
reply_handler=on_reply, error_handler=on_error, timeout=120)
+ device.connecting_peer = peer
+ # Create the RTSP server now so it's ready as soon as the P2P connection
succeeds even if
+ # we haven't received the DBus reply yet
+ peer.rtsp = WFDRTSPServer(self.rtsp_port, on_rtsp_state, on_rtsp_error)
+ self.update_dev_props(dev_path)
+ self.update_peer_props(dev_path, path)
+ if peer != device.selected_peer:
+ self.update_selected_peer(dev_path)
+
+ def disconnect_peer(self, dev_path, path):
+ device = self.devices[dev_path]
+ peer = device.peers[path]
+
+ def on_reply():
+ device.disconnecting_peer = None
+ self.update_dev_props(dev_path)
+ self.update_peer_props(dev_path, path)
+ if peer != device.selected_peer:
+ self.update_selected_peer(dev_path)
+
+ def on_error(excp):
+ device.disconnecting_peer = None
+ self.update_dev_props(dev_path)
+ self.update_peer_props(dev_path, path)
+ if peer != device.selected_peer:
+ self.update_selected_peer(dev_path)
+
+ if isinstance(excp, dbus.exceptions.DBusException) and excp.get_dbus_name()
== 'net.connman.iwd.NotConnected':
+ return
+
+ dialog = Gtk.MessageDialog(parent=self, message_type=Gtk.MessageType.ERROR,
buttons=Gtk.ButtonsType.OK, text='Disconnecting failed')
+ dialog.format_secondary_text('Disconnecting from ' +
self.objects[path][PEER_IF]['Name'] + ' failed: ' + repr(excp))
+ dialog.show()
+
+ def on_ok(response, *args):
+ dialog.destroy()
+
+ dialog.connect('response', on_ok)
+
+ if peer == device.connecting_peer:
+ device.dbus_call.cancel()
+ device.connecting_peer = None
+
+ if peer in device.connected:
+ device.connected.remove(peer)
+
+ if peer.rtsp:
+ peer.rtsp.close()
+ peer.rtsp = None
+
+ device.dbus_call = self.async_call(peer.peer_proxy, 'Disconnect',
reply_handler=on_reply, error_handler=on_error)
+ device.disconnecting_peer = peer
+ self.update_dev_props(dev_path)
+ self.update_peer_props(dev_path, path)
+ if peer != device.selected_peer:
+ self.update_selected_peer(dev_path)
+
+ def on_peer_click(self, widget, event, data):
+ if event.button != 1 or event.type != Gdk.EventType._2BUTTON_PRESS:
+ return False
+ dev_path, path = data
+ device = self.devices[dev_path]
+ if device.disconnecting_peer:
+ return True
+ if device.connecting_peer or not device.props['AvailableConnections']:
+ # Should we auto-disconnect from the connected peer? Show an "Are you
sure?" dialog?
+ return True
+ self.connect_peer(dev_path, path)
+ return True
+
+ def on_peer_button(self, widget, data):
+ dev_path, path = data
+ action = widget.get_label()
+ device = self.devices[dev_path]
+ if device.disconnecting_peer:
+ return True
+ if action == 'Connect':
+ self.connect_peer(dev_path, path)
+ elif action in ['Disconnect', 'Cancel']:
+ self.disconnect_peer(dev_path, path)
+ return True
+
+ def get_peer_path(self, device, peer):
+ for path in device.peers:
+ if device.peers[path] == device.selected_peer:
+ return path
+ return None
+
+ def on_peer_selected(self, widget, row, dev_path):
+ device = self.devices[dev_path]
+
+ if device.selected_peer is not None:
+ if device.selected_peer.widget == row:
+ return True
+
+ path = self.get_peer_path(device, device.selected_peer)
+ device.selected_peer = None
+ self.update_peer_props(dev_path, path)
+ self.update_info(dev_path, None)
+
+ if row is None:
+ return True
+
+ for path in device.peers:
+ if device.peers[path].widget == row:
+ device.selected_peer = device.peers[path]
+ self.update_peer_props(dev_path, path)
+ return True
+
+ def update_dev_scan_request(self, path):
+ device = self.devices[path]
+ should_request = device.expanded and self.is_active()
+ if device.scan_request == should_request:
+ return
+
+ device.scan_request = should_request
+ if device.scan_request:
+ device.dev_proxy.RequestDiscovery()
+ else:
+ device.dev_proxy.ReleaseDiscovery()
+ self.update_dev_props(path)
+
+ def on_notify_is_active(self, window, value):
+ if self.devices is None:
+ return True
+
+ for path in self.devices:
+ self.update_dev_scan_request(path)
+ return True
+
+ def on_dev_enabled(self, switch, state, path):
+ device = self.devices[path]
+ if device.props['Enabled'] == state:
+ return
+ device.props['Enabled'] = state
+ device.props_proxy.Set(DEVICE_IF, 'Enabled', state)
+ return True
+
+ def on_dev_expanded(self, expander, value, path):
+ device = self.devices[path]
+ device.expanded = expander.get_expanded()
+ self.update_dev_scan_request(path)
+ return True
+
+ def get_dev_string(self, path):
+ wiphy = self.objects[path][WIPHY_IF]
+ if 'Model' in wiphy:
+ return wiphy['Model']
+ if 'Vendor' in wiphy:
+ return wiphy['Vendor']
+ return wiphy['Name']
+
+ def on_destroy(self, widget, data):
+ global mainloop
+ if self.devices is not None:
+ svc_mgr = dbus.Interface(self.dbus.get_object('net.connman.iwd',
'/net/connman/iwd'), SVC_MGR_IF)
+ svc_mgr.UnregisterDisplayService()
+ self.on_name_owner_change('')
+ mainloop.quit()
+ return False
+
+dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+Gst.init(None)
+WFDSource()
+mainloop = GLib.MainLoop()
+mainloop.run()
--
2.25.1