Skip to content

Commit

Permalink
#1196: bundle muxer metadata with the sound packets if the other end …
Browse files Browse the repository at this point in the history
…supports it - in this context, the "other end" means the sound subprocess or the remote end of the network connection (client or server)

git-svn-id: https://xpra.org/svn/Xpra/trunk@12578 3bb7dfac-3a0b-4e04-842a-767bc560f471
  • Loading branch information
totaam committed May 13, 2016
1 parent 04742b4 commit 9169e99
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 27 deletions.
42 changes: 35 additions & 7 deletions src/xpra/client/ui_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from xpra.scripts.config import parse_bool_or_int, parse_bool, FALSE_OPTIONS, TRUE_OPTIONS
from xpra.simple_stats import std_unit
from xpra.net import compression, packet_encoding
from xpra.net.compression import Compressed
from xpra.child_reaper import reaper_cleanup
from xpra.make_thread import make_thread
from xpra.os_util import BytesIOClass, Queue, platform_name, get_machine_id, get_user_uuid, bytestostr
Expand Down Expand Up @@ -208,6 +209,7 @@ def __init__(self):
self.server_sound_encoders = []
self.server_sound_receive = False
self.server_sound_send = False
self.server_sound_bundle_metadata = False
self.server_ogg_latency_fix = False
self.queue_used_sent = None

Expand Down Expand Up @@ -1546,6 +1548,7 @@ def make_hello(self):
sound_caps["encoders"] = self.microphone_codecs
sound_caps["send"] = self.microphone_allowed
sound_caps["receive"] = self.speaker_allowed
sound_caps["bundle-metadata"] = self.sound_properties.get("bundle-metadata")
try:
from xpra.sound.pulseaudio.pulseaudio_util import get_info as get_pa_info
sound_caps.update(get_pa_info())
Expand Down Expand Up @@ -1870,6 +1873,7 @@ def process_ui_capabilities(self):
self.server_sound_encoders = c.strlistget("sound.encoders", [])
self.server_sound_receive = c.boolget("sound.receive")
self.server_sound_send = c.boolget("sound.send")
self.server_sound_bundle_metadata = c.boolget("sound.bundle-metadata")
self.server_ogg_latency_fix = c.boolget("sound.ogg-latency-fix", False)
soundlog("pulseaudio id=%s, server=%s, sound decoders=%s, sound encoders=%s, receive=%s, send=%s",
self.server_pulseaudio_id, self.server_pulseaudio_server,
Expand Down Expand Up @@ -2398,11 +2402,26 @@ def start_sound_sink(self, codec):
self.sound_sink_error(self.sound_sink, e)
return False

def new_sound_buffer(self, sound_source, data, metadata):
soundlog("new_sound_buffer(%s, %s, %s)", sound_source, len(data or []), metadata)
if self.sound_source:
self.sound_out_bytecount += len(data)
self.send("sound-data", self.sound_source.codec, compression.Compressed(self.sound_source.codec, data), metadata)
def new_sound_buffer(self, sound_source, data, metadata, packet_metadata=[]):
soundlog("new_sound_buffer(%s, %s, %s, %s)", sound_source, len(data or []), metadata, packet_metadata)
if not self.sound_source:
return
self.sound_out_bytecount += len(data)
for x in packet_metadata:
self.sound_out_bytecount += len(x)
if packet_metadata and not self.server_sound_bundle_metadata:
#server does not support bundling, send packet metadata as individual packets before the main packet:
for x in packet_metadata:
self.send_sound_data(sound_source, x)
packet_metadata = ()
self.send_sound_data(sound_source, data, metadata, packet_metadata)

def send_sound_data(self, sound_source, data, metadata={}, packet_metadata=()):
packet_data = [sound_source.codec, Compressed(sound_source.codec, data), metadata]
if packet_metadata:
assert self.server_sound_bundle_metadata
packet_data.append(packet_metadata)
self.send("sound-data", *packet_data)

def _process_sound_data(self, packet):
codec, data, metadata = packet[1:4]
Expand Down Expand Up @@ -2450,9 +2469,18 @@ def _process_sound_data(self, packet):
soundlog("sound data received, sound sink is stopped - telling server to stop")
self.stop_receiving_sound()
return
#the server may send packet_metadata, which is pushed before the actual sound data:
packet_metadata = ()
if len(packet)>4:
packet_metadata = packet[4]
if not self.sound_properties.get("bundle-metadata"):
#we don't handle bundling, so push individually:
for x in packet_metadata:
ss.add_data(x)
packet_metadata = ()
#(some packets (ie: sos, eos) only contain metadata)
if len(data)>0:
ss.add_data(data, metadata)
if len(data)>0 or packet_metadata:
ss.add_data(data, metadata, packet_metadata)
if self.av_sync and self.server_av_sync:
info = ss.get_info()
queue_used = info.get("queue.cur") or info.get("queue", {}).get("cur")
Expand Down
32 changes: 25 additions & 7 deletions src/xpra/server/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,7 @@ def parse_batch_int(value, varname):
self.sound_encoders = c.strlistget("sound.encoders", [])
self.sound_receive = c.boolget("sound.receive")
self.sound_send = c.boolget("sound.send")
self.sound_bundle_metadata = c.boolget("sound.bundle-metadata")
av_sync = c.boolget("av-sync")
self.set_av_sync_delay(int(self.av_sync and av_sync) * c.intget("av-sync.delay.default", 150))
soundlog("pulseaudio id=%s, server=%s, sound decoders=%s, sound encoders=%s, receive=%s, send=%s",
Expand Down Expand Up @@ -980,18 +981,30 @@ def new_stream(self, sound_source, codec):
"sequence" : sound_source.sequence})
self.update_av_sync_delay_total()

def new_sound_buffer(self, sound_source, data, metadata):
soundlog("new_sound_buffer(%s, %s, %s) suspended=%s",
sound_source, len(data or []), metadata, self.suspended)
def new_sound_buffer(self, sound_source, data, metadata, packet_metadata=[]):
soundlog("new_sound_buffer(%s, %s, %s, %s) suspended=%s",
sound_source, len(data or []), metadata, [len(x) for x in packet_metadata], self.suspended)
if self.sound_source!=sound_source or self.is_closed():
soundlog("sound buffer dropped: from old source or closed")
return
if sound_source.sequence<self.sound_source_sequence:
soundlog("sound buffer dropped: old sequence number: %s (current is %s)", sound_source.sequence, self.sound_source_sequence)
return
if packet_metadata and not self.sound_bundle_metadata:
#client does not support bundling, send packet metadata as individual packets before the main packet:
for x in packet_metadata:
self.send_sound_data(sound_source, x)
packet_metadata = ()
self.send_sound_data(sound_source, data, metadata, packet_metadata)

def send_sound_data(self, sound_source, data, metadata={}, packet_metadata=()):
packet_data = [sound_source.codec, Compressed(sound_source.codec, data), metadata]
if packet_metadata:
assert self.sound_bundle_metadata
packet_data.append(packet_metadata)
if sound_source.sequence>=0:
metadata["sequence"] = sound_source.sequence
self.send("sound-data", sound_source.codec, Compressed(sound_source.codec, data), metadata)
self.send("sound-data", *packet_data)

def stop_receiving_sound(self):
ss = self.sound_sink
Expand Down Expand Up @@ -1083,8 +1096,8 @@ def fadeout():
log.error(msg)
return msg

def sound_data(self, codec, data, metadata, *args):
soundlog("sound_data(%s, %s, %s, %s) sound sink=%s", codec, len(data or []), metadata, args, self.sound_sink)
def sound_data(self, codec, data, metadata, packet_metadata=()):
soundlog("sound_data(%s, %s, %s, %s) sound sink=%s", codec, len(data or []), metadata, packet_metadata, self.sound_sink)
if self.is_closed():
return
if self.sound_sink is not None and codec!=self.sound_sink.codec:
Expand Down Expand Up @@ -1112,7 +1125,12 @@ def sound_sink_error(*args):
except Exception:
soundlog.error("failed to setup sound", exc_info=True)
return
self.sound_sink.add_data(data, metadata)
if packet_metadata:
if not self.sound_properties.get("bundle-metadata"):
for x in packet_metadata:
self.sound_sink.add_data(x)
packet_metadata = ()
self.sound_sink.add_data(data, metadata, packet_metadata)


def set_av_sync_delta(self, delta):
Expand Down
8 changes: 6 additions & 2 deletions src/xpra/sound/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ def can_push_buffer(self):
return False
return True

def add_data0(self, data, metadata=None):
def add_data0(self, data, metadata=None, packet_metadata=()):
if not self.can_push_buffer():
return
self.last_data = data
Expand All @@ -417,12 +417,16 @@ def fadein():
if delta<50:
gstlog("dropping sample to try to avoid overrun")
return
for x in packet_metadata:
self.do_add_data(x)
self.do_add_data(data, metadata)
self.emit_info()

def add_data1(self, data, metadata=None):
def add_data1(self, data, metadata=None, packet_metadata=()):
if not self.can_push_buffer():
return
for x in packet_metadata:
self.do_add_data(x)
self.do_add_data(data, metadata)
if self.queue_state=="pushing":
self.set_min_level()
Expand Down
35 changes: 27 additions & 8 deletions src/xpra/sound/src.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
BUFFER_TIME = int(os.environ.get("XPRA_SOUND_SOURCE_BUFFER_TIME", "0")) #ie: 64
LATENCY_TIME = int(os.environ.get("XPRA_SOUND_SOURCE_LATENCY_TIME", "0")) #ie: 32

BUNDLE_METADATA = os.environ.get("XPRA_SOUND_BUNDLE_METADATA", "1")=="1"

SAVE_TO_FILE = os.environ.get("XPRA_SAVE_TO_FILE")

generation = AtomicInteger()
Expand All @@ -35,7 +37,7 @@ class SoundSource(SoundPipeline):

__gsignals__ = SoundPipeline.__generic_signals__.copy()
__gsignals__.update({
"new-buffer" : n_arg_signal(2),
"new-buffer" : n_arg_signal(3),
})

def __init__(self, src_type=None, src_options={}, codecs=get_codecs(), codec_options={}, volume=1.0):
Expand Down Expand Up @@ -74,6 +76,7 @@ def __init__(self, src_type=None, src_options={}, codecs=get_codecs(), codec_opt
self.sink = None
self.src = None
self.src_type = src_type
self.pending_metadata = []
self.buffer_latency = False
self.jitter_queue = None
self.file = None
Expand Down Expand Up @@ -214,9 +217,15 @@ def emit_buffer1(self, sample):
from xpra.sound.gst_hacks import map_gst_buffer
with map_gst_buffer(buf) as a:
data = bytes(a[:])
return self.emit_buffer(data, {"timestamp" : normv(buf.pts),
"duration" : normv(buf.duration),
})
pts = normv(buf.pts)
duration = normv(buf.duration)
if pts==-1 and duration==-1 and BUNDLE_METADATA and len(self.pending_metadata)<10:
self.pending_metadata.append(data)
return 0
return self.emit_buffer(data, {
"timestamp" : pts,
"duration" : duration,
})


def on_new_preroll0(self, appsink):
Expand Down Expand Up @@ -273,8 +282,11 @@ def emit_buffer0(self, buf):

def emit_buffer(self, data, metadata={}):
f = self.file
if f and data:
self.file.write(data)
if f:
for x in self.pending_metadata:
self.file.write(x)
if data:
self.file.write(data)
self.file.flush()
if self.state=="stopped":
#don't bother
Expand All @@ -287,6 +299,8 @@ def emit_buffer(self, data, metadata={}):
jitter = randint(1, JITTER)
self.timeout_add(jitter, self.flush_jitter_queue)
log("emit_buffer: will flush jitter queue in %ims", jitter)
for x in self.pending_metadata:
self.jitter_queue.put((x, {}))
self.jitter_queue.put((data, metadata))
return 0
log("emit_buffer data=%s, len=%i, metadata=%s", type(data), len(data), metadata)
Expand All @@ -300,8 +314,11 @@ def flush_jitter_queue(self):
def do_emit_buffer(self, data, metadata={}):
self.inc_buffer_count()
self.inc_byte_count(len(data))
for x in self.pending_metadata:
self.inc_byte_count(len(x))
metadata["time"] = int(time.time()*1000)
self.idle_emit("new-buffer", data, metadata)
self.idle_emit("new-buffer", data, metadata, self.pending_metadata)
self.pending_metadata = []
self.emit_info()
return 0

Expand Down Expand Up @@ -363,10 +380,12 @@ def main():
f = open(filename, "wb")
ss = SoundSource(codecs=[codec])
lock = Lock()
def new_buffer(ss, data, metadata):
def new_buffer(ss, data, metadata, packet_metadata):
log.info("new buffer: %s bytes (%s), metadata=%s", len(data), type(data), metadata)
with lock:
if f:
for x in packet_metadata:
f.write(x)
f.write(data)
f.flush()

Expand Down
14 changes: 11 additions & 3 deletions src/xpra/sound/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
FAKE_EXIT = int(os.environ.get("XPRA_SOUND_FAKE_EXIT", "0"))
FAKE_CRASH = int(os.environ.get("XPRA_SOUND_FAKE_CRASH", "0"))
SOUND_START_TIMEOUT = int(os.environ.get("XPRA_SOUND_START_TIMEOUT", "3000"))
BUNDLE_METADATA = os.environ.get("XPRA_SOUND_BUNDLE_METADATA", "1")=="1"


def get_sound_wrapper_env():
Expand Down Expand Up @@ -155,6 +156,8 @@ def run_sound(mode, error_cb, options, args):
}
for k,v in d.items():
print("%s=%s" % (k, ",".join(str(x) for x in v)))
if BUNDLE_METADATA:
print("bundle-metadata=True")
return 0
else:
log.error("unknown mode: %s" % mode)
Expand Down Expand Up @@ -313,10 +316,15 @@ def __init__(self, plugin, options, codec, volume, element_options):
self.command = get_sound_command()+["_sound_play", "-", "-", plugin or "", format_element_options(element_options), codec, "", str(volume)]
_add_debug_args(self.command)

def add_data(self, data, metadata):
def add_data(self, data, metadata={}, packet_metadata=()):
if DEBUG_SOUND:
log("add_data(%s bytes, %s) forwarding to %s", len(data), metadata, self.protocol)
self.send("add_data", data, dict(metadata))
log("add_data(%s bytes, %s, %s) forwarding to %s", len(data), metadata, len(packet_metadata), self.protocol)
#theoretically, the sound process could be a different version,
#so don't add the extra packet_metadata argument unless we know we actually want it:
if packet_metadata:
self.send("add_data", data, dict(metadata), packet_metadata)
else:
self.send("add_data", data, dict(metadata))

def __repr__(self):
try:
Expand Down

0 comments on commit 9169e99

Please sign in to comment.