From 9169e99cc1452dce2b5c4ef5d7425bb8987d4d3d Mon Sep 17 00:00:00 2001 From: Antoine Martin Date: Fri, 13 May 2016 11:44:14 +0000 Subject: [PATCH] #1196: bundle muxer metadata with the sound packets if the other end supports it - in this context, the "other end" means the sound subprocess or the remote end of the network connection (client or server) git-svn-id: https://xpra.org/svn/Xpra/trunk@12578 3bb7dfac-3a0b-4e04-842a-767bc560f471 --- src/xpra/client/ui_client_base.py | 42 +++++++++++++++++++++++++------ src/xpra/server/source.py | 32 +++++++++++++++++------ src/xpra/sound/sink.py | 8 ++++-- src/xpra/sound/src.py | 35 ++++++++++++++++++++------ src/xpra/sound/wrapper.py | 14 ++++++++--- 5 files changed, 104 insertions(+), 27 deletions(-) diff --git a/src/xpra/client/ui_client_base.py b/src/xpra/client/ui_client_base.py index 1db26c6486..aadf1dd1c4 100644 --- a/src/xpra/client/ui_client_base.py +++ b/src/xpra/client/ui_client_base.py @@ -53,6 +53,7 @@ from xpra.scripts.config import parse_bool_or_int, parse_bool, FALSE_OPTIONS, TRUE_OPTIONS from xpra.simple_stats import std_unit from xpra.net import compression, packet_encoding +from xpra.net.compression import Compressed from xpra.child_reaper import reaper_cleanup from xpra.make_thread import make_thread from xpra.os_util import BytesIOClass, Queue, platform_name, get_machine_id, get_user_uuid, bytestostr @@ -208,6 +209,7 @@ def __init__(self): self.server_sound_encoders = [] self.server_sound_receive = False self.server_sound_send = False + self.server_sound_bundle_metadata = False self.server_ogg_latency_fix = False self.queue_used_sent = None @@ -1546,6 +1548,7 @@ def make_hello(self): sound_caps["encoders"] = self.microphone_codecs sound_caps["send"] = self.microphone_allowed sound_caps["receive"] = self.speaker_allowed + sound_caps["bundle-metadata"] = self.sound_properties.get("bundle-metadata") try: from xpra.sound.pulseaudio.pulseaudio_util import get_info as get_pa_info sound_caps.update(get_pa_info()) @@ -1870,6 +1873,7 @@ def process_ui_capabilities(self): self.server_sound_encoders = c.strlistget("sound.encoders", []) self.server_sound_receive = c.boolget("sound.receive") self.server_sound_send = c.boolget("sound.send") + self.server_sound_bundle_metadata = c.boolget("sound.bundle-metadata") self.server_ogg_latency_fix = c.boolget("sound.ogg-latency-fix", False) soundlog("pulseaudio id=%s, server=%s, sound decoders=%s, sound encoders=%s, receive=%s, send=%s", self.server_pulseaudio_id, self.server_pulseaudio_server, @@ -2398,11 +2402,26 @@ def start_sound_sink(self, codec): self.sound_sink_error(self.sound_sink, e) return False - def new_sound_buffer(self, sound_source, data, metadata): - soundlog("new_sound_buffer(%s, %s, %s)", sound_source, len(data or []), metadata) - if self.sound_source: - self.sound_out_bytecount += len(data) - self.send("sound-data", self.sound_source.codec, compression.Compressed(self.sound_source.codec, data), metadata) + def new_sound_buffer(self, sound_source, data, metadata, packet_metadata=[]): + soundlog("new_sound_buffer(%s, %s, %s, %s)", sound_source, len(data or []), metadata, packet_metadata) + if not self.sound_source: + return + self.sound_out_bytecount += len(data) + for x in packet_metadata: + self.sound_out_bytecount += len(x) + if packet_metadata and not self.server_sound_bundle_metadata: + #server does not support bundling, send packet metadata as individual packets before the main packet: + for x in packet_metadata: + self.send_sound_data(sound_source, x) + packet_metadata = () + self.send_sound_data(sound_source, data, metadata, packet_metadata) + + def send_sound_data(self, sound_source, data, metadata={}, packet_metadata=()): + packet_data = [sound_source.codec, Compressed(sound_source.codec, data), metadata] + if packet_metadata: + assert self.server_sound_bundle_metadata + packet_data.append(packet_metadata) + self.send("sound-data", *packet_data) def _process_sound_data(self, packet): codec, data, metadata = packet[1:4] @@ -2450,9 +2469,18 @@ def _process_sound_data(self, packet): soundlog("sound data received, sound sink is stopped - telling server to stop") self.stop_receiving_sound() return + #the server may send packet_metadata, which is pushed before the actual sound data: + packet_metadata = () + if len(packet)>4: + packet_metadata = packet[4] + if not self.sound_properties.get("bundle-metadata"): + #we don't handle bundling, so push individually: + for x in packet_metadata: + ss.add_data(x) + packet_metadata = () #(some packets (ie: sos, eos) only contain metadata) - if len(data)>0: - ss.add_data(data, metadata) + if len(data)>0 or packet_metadata: + ss.add_data(data, metadata, packet_metadata) if self.av_sync and self.server_av_sync: info = ss.get_info() queue_used = info.get("queue.cur") or info.get("queue", {}).get("cur") diff --git a/src/xpra/server/source.py b/src/xpra/server/source.py index 7cd924af3b..2fcedca360 100644 --- a/src/xpra/server/source.py +++ b/src/xpra/server/source.py @@ -713,6 +713,7 @@ def parse_batch_int(value, varname): self.sound_encoders = c.strlistget("sound.encoders", []) self.sound_receive = c.boolget("sound.receive") self.sound_send = c.boolget("sound.send") + self.sound_bundle_metadata = c.boolget("sound.bundle-metadata") av_sync = c.boolget("av-sync") self.set_av_sync_delay(int(self.av_sync and av_sync) * c.intget("av-sync.delay.default", 150)) soundlog("pulseaudio id=%s, server=%s, sound decoders=%s, sound encoders=%s, receive=%s, send=%s", @@ -980,18 +981,30 @@ def new_stream(self, sound_source, codec): "sequence" : sound_source.sequence}) self.update_av_sync_delay_total() - def new_sound_buffer(self, sound_source, data, metadata): - soundlog("new_sound_buffer(%s, %s, %s) suspended=%s", - sound_source, len(data or []), metadata, self.suspended) + def new_sound_buffer(self, sound_source, data, metadata, packet_metadata=[]): + soundlog("new_sound_buffer(%s, %s, %s, %s) suspended=%s", + sound_source, len(data or []), metadata, [len(x) for x in packet_metadata], self.suspended) if self.sound_source!=sound_source or self.is_closed(): soundlog("sound buffer dropped: from old source or closed") return if sound_source.sequence=0: metadata["sequence"] = sound_source.sequence - self.send("sound-data", sound_source.codec, Compressed(sound_source.codec, data), metadata) + self.send("sound-data", *packet_data) def stop_receiving_sound(self): ss = self.sound_sink @@ -1083,8 +1096,8 @@ def fadeout(): log.error(msg) return msg - def sound_data(self, codec, data, metadata, *args): - soundlog("sound_data(%s, %s, %s, %s) sound sink=%s", codec, len(data or []), metadata, args, self.sound_sink) + def sound_data(self, codec, data, metadata, packet_metadata=()): + soundlog("sound_data(%s, %s, %s, %s) sound sink=%s", codec, len(data or []), metadata, packet_metadata, self.sound_sink) if self.is_closed(): return if self.sound_sink is not None and codec!=self.sound_sink.codec: @@ -1112,7 +1125,12 @@ def sound_sink_error(*args): except Exception: soundlog.error("failed to setup sound", exc_info=True) return - self.sound_sink.add_data(data, metadata) + if packet_metadata: + if not self.sound_properties.get("bundle-metadata"): + for x in packet_metadata: + self.sound_sink.add_data(x) + packet_metadata = () + self.sound_sink.add_data(data, metadata, packet_metadata) def set_av_sync_delta(self, delta): diff --git a/src/xpra/sound/sink.py b/src/xpra/sound/sink.py index 33374841aa..b273f0a22c 100755 --- a/src/xpra/sound/sink.py +++ b/src/xpra/sound/sink.py @@ -390,7 +390,7 @@ def can_push_buffer(self): return False return True - def add_data0(self, data, metadata=None): + def add_data0(self, data, metadata=None, packet_metadata=()): if not self.can_push_buffer(): return self.last_data = data @@ -417,12 +417,16 @@ def fadein(): if delta<50: gstlog("dropping sample to try to avoid overrun") return + for x in packet_metadata: + self.do_add_data(x) self.do_add_data(data, metadata) self.emit_info() - def add_data1(self, data, metadata=None): + def add_data1(self, data, metadata=None, packet_metadata=()): if not self.can_push_buffer(): return + for x in packet_metadata: + self.do_add_data(x) self.do_add_data(data, metadata) if self.queue_state=="pushing": self.set_min_level() diff --git a/src/xpra/sound/src.py b/src/xpra/sound/src.py index 0f4b6c16a5..76f0850d33 100755 --- a/src/xpra/sound/src.py +++ b/src/xpra/sound/src.py @@ -26,6 +26,8 @@ BUFFER_TIME = int(os.environ.get("XPRA_SOUND_SOURCE_BUFFER_TIME", "0")) #ie: 64 LATENCY_TIME = int(os.environ.get("XPRA_SOUND_SOURCE_LATENCY_TIME", "0")) #ie: 32 +BUNDLE_METADATA = os.environ.get("XPRA_SOUND_BUNDLE_METADATA", "1")=="1" + SAVE_TO_FILE = os.environ.get("XPRA_SAVE_TO_FILE") generation = AtomicInteger() @@ -35,7 +37,7 @@ class SoundSource(SoundPipeline): __gsignals__ = SoundPipeline.__generic_signals__.copy() __gsignals__.update({ - "new-buffer" : n_arg_signal(2), + "new-buffer" : n_arg_signal(3), }) def __init__(self, src_type=None, src_options={}, codecs=get_codecs(), codec_options={}, volume=1.0): @@ -74,6 +76,7 @@ def __init__(self, src_type=None, src_options={}, codecs=get_codecs(), codec_opt self.sink = None self.src = None self.src_type = src_type + self.pending_metadata = [] self.buffer_latency = False self.jitter_queue = None self.file = None @@ -214,9 +217,15 @@ def emit_buffer1(self, sample): from xpra.sound.gst_hacks import map_gst_buffer with map_gst_buffer(buf) as a: data = bytes(a[:]) - return self.emit_buffer(data, {"timestamp" : normv(buf.pts), - "duration" : normv(buf.duration), - }) + pts = normv(buf.pts) + duration = normv(buf.duration) + if pts==-1 and duration==-1 and BUNDLE_METADATA and len(self.pending_metadata)<10: + self.pending_metadata.append(data) + return 0 + return self.emit_buffer(data, { + "timestamp" : pts, + "duration" : duration, + }) def on_new_preroll0(self, appsink): @@ -273,8 +282,11 @@ def emit_buffer0(self, buf): def emit_buffer(self, data, metadata={}): f = self.file - if f and data: - self.file.write(data) + if f: + for x in self.pending_metadata: + self.file.write(x) + if data: + self.file.write(data) self.file.flush() if self.state=="stopped": #don't bother @@ -287,6 +299,8 @@ def emit_buffer(self, data, metadata={}): jitter = randint(1, JITTER) self.timeout_add(jitter, self.flush_jitter_queue) log("emit_buffer: will flush jitter queue in %ims", jitter) + for x in self.pending_metadata: + self.jitter_queue.put((x, {})) self.jitter_queue.put((data, metadata)) return 0 log("emit_buffer data=%s, len=%i, metadata=%s", type(data), len(data), metadata) @@ -300,8 +314,11 @@ def flush_jitter_queue(self): def do_emit_buffer(self, data, metadata={}): self.inc_buffer_count() self.inc_byte_count(len(data)) + for x in self.pending_metadata: + self.inc_byte_count(len(x)) metadata["time"] = int(time.time()*1000) - self.idle_emit("new-buffer", data, metadata) + self.idle_emit("new-buffer", data, metadata, self.pending_metadata) + self.pending_metadata = [] self.emit_info() return 0 @@ -363,10 +380,12 @@ def main(): f = open(filename, "wb") ss = SoundSource(codecs=[codec]) lock = Lock() - def new_buffer(ss, data, metadata): + def new_buffer(ss, data, metadata, packet_metadata): log.info("new buffer: %s bytes (%s), metadata=%s", len(data), type(data), metadata) with lock: if f: + for x in packet_metadata: + f.write(x) f.write(data) f.flush() diff --git a/src/xpra/sound/wrapper.py b/src/xpra/sound/wrapper.py index 8dc042b361..2f817b6315 100644 --- a/src/xpra/sound/wrapper.py +++ b/src/xpra/sound/wrapper.py @@ -22,6 +22,7 @@ FAKE_EXIT = int(os.environ.get("XPRA_SOUND_FAKE_EXIT", "0")) FAKE_CRASH = int(os.environ.get("XPRA_SOUND_FAKE_CRASH", "0")) SOUND_START_TIMEOUT = int(os.environ.get("XPRA_SOUND_START_TIMEOUT", "3000")) +BUNDLE_METADATA = os.environ.get("XPRA_SOUND_BUNDLE_METADATA", "1")=="1" def get_sound_wrapper_env(): @@ -155,6 +156,8 @@ def run_sound(mode, error_cb, options, args): } for k,v in d.items(): print("%s=%s" % (k, ",".join(str(x) for x in v))) + if BUNDLE_METADATA: + print("bundle-metadata=True") return 0 else: log.error("unknown mode: %s" % mode) @@ -313,10 +316,15 @@ def __init__(self, plugin, options, codec, volume, element_options): self.command = get_sound_command()+["_sound_play", "-", "-", plugin or "", format_element_options(element_options), codec, "", str(volume)] _add_debug_args(self.command) - def add_data(self, data, metadata): + def add_data(self, data, metadata={}, packet_metadata=()): if DEBUG_SOUND: - log("add_data(%s bytes, %s) forwarding to %s", len(data), metadata, self.protocol) - self.send("add_data", data, dict(metadata)) + log("add_data(%s bytes, %s, %s) forwarding to %s", len(data), metadata, len(packet_metadata), self.protocol) + #theoretically, the sound process could be a different version, + #so don't add the extra packet_metadata argument unless we know we actually want it: + if packet_metadata: + self.send("add_data", data, dict(metadata), packet_metadata) + else: + self.send("add_data", data, dict(metadata)) def __repr__(self): try: