Skip to content

Commit

Permalink
#2022: always expire delayed regions reasonably quickly (defaults to …
Browse files Browse the repository at this point in the history
…50ms), even when the batch delay is high, but we only actually send those expired regions if there is no backlog at all or eventually when we have honoured the full batch delay - for clarity, move the damage delayed data (including expired flag) to a new class instead of an array and indices

git-svn-id: https://xpra.org/svn/Xpra/trunk@21299 3bb7dfac-3a0b-4e04-842a-767bc560f471
  • Loading branch information
totaam committed Jan 9, 2019
1 parent 15a5efd commit eb23061
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 41 deletions.
11 changes: 8 additions & 3 deletions src/xpra/server/window/batch_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# This file is part of Xpra.
# Copyright (C) 2011 Serviware (Arthur Huillet, <[email protected]>)
# Copyright (C) 2010-2017 Antoine Martin <[email protected]>
# Copyright (C) 2010-2019 Antoine Martin <[email protected]>
# Copyright (C) 2008 Nathaniel Smith <[email protected]>
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
# later version. See the file COPYING for details.
Expand Down Expand Up @@ -45,6 +45,7 @@ class DamageBatchConfig(object):
MIN_DELAY = ival("MIN_DELAY", 5, 0, 1000) #lower than 5 milliseconds does not make sense, just don't batch
START_DELAY = ival("START_DELAY", 50, 1, 1000)
MAX_DELAY = ival("MAX_DELAY", 500, 1, 15000)
EXPIRE_DELAY = ival("EXPIRE_DELAY", 50, 10, 1000)
TIMEOUT_DELAY = ival("TIMEOUT_DELAY", 15000, 100, 100000)

def __init__(self):
Expand All @@ -56,6 +57,7 @@ def __init__(self):
self.min_delay = self.MIN_DELAY
self.max_delay = self.MAX_DELAY
self.timeout_delay = self.TIMEOUT_DELAY
self.expire_delay = self.EXPIRE_DELAY
self.delay = self.START_DELAY
self.saved = self.START_DELAY
self.locked = False #to force a specific delay
Expand All @@ -74,6 +76,7 @@ def get_info(self):
info = {
"min-delay" : self.min_delay,
"max-delay" : self.max_delay,
"expire" : self.expire_delay,
"timeout-delay" : self.timeout_delay,
"locked" : self.locked,
}
Expand All @@ -97,8 +100,10 @@ def get_info(self):

def clone(self):
c = DamageBatchConfig()
for x in ["always", "max_events", "max_pixels", "time_unit",
"min_delay", "max_delay", "timeout_delay", "delay"]:
for x in [
"always", "max_events", "max_pixels", "time_unit",
"min_delay", "max_delay", "timeout_delay", "delay", "expire_delay",
]:
setattr(c, x, getattr(self, x))
return c

Expand Down
90 changes: 58 additions & 32 deletions src/xpra/server/window/window_source.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# This file is part of Xpra.
# Copyright (C) 2011 Serviware (Arthur Huillet, <[email protected]>)
# Copyright (C) 2010-2018 Antoine Martin <[email protected]>
# Copyright (C) 2010-2019 Antoine Martin <[email protected]>
# Copyright (C) 2008 Nathaniel Smith <[email protected]>
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
# later version. See the file COPYING for details.
Expand All @@ -13,7 +13,7 @@
from collections import deque

from xpra.os_util import monotonic_time
from xpra.util import envint, envbool, csv, typedict
from xpra.util import envint, envbool, csv, typedict, first_time
from xpra.log import Logger
log = Logger("window", "encoding")
refreshlog = Logger("window", "refresh")
Expand Down Expand Up @@ -87,6 +87,15 @@
REFRESH_ENCODINGS = ("webp", "png", "rgb24", "rgb32", "jpeg2000")


class DelayedRegions(object):
def __init__(self, damage_time, regions, encoding, options):
self.expired = False
self.damage_time = damage_time
self.regions = regions
self.encoding = encoding
self.options = options or {}


def capr(v):
return min(100, max(0, int(v)))

Expand Down Expand Up @@ -365,7 +374,6 @@ def init_vars(self):
self._fixed_min_speed = 0
#
self._damage_delayed = None
self._damage_delayed_expired = False
self._sequence = 1
self._damage_cancelled = 0
self._damage_packet_sequence = 1
Expand Down Expand Up @@ -903,7 +911,6 @@ def cancel_damage(self):
#if a region was delayed, we can just drop it now:
self.refresh_regions = []
self._damage_delayed = None
self._damage_delayed_expired = False
self.delta_pixel_data = [None for _ in range(self.delta_buckets)]
#make sure we don't account for those as they will get dropped
#(generally before encoding - only one may still get encoded):
Expand Down Expand Up @@ -1224,24 +1231,24 @@ def do_damage(self, ww, wh, x, y, w, h, options):
delayed = self._damage_delayed
if delayed:
#use existing delayed region:
regions = delayed.regions
if not self.full_frames_only:
regions = delayed[1]
region = rectangle(x, y, w, h)
add_rectangle(regions, region)
#merge/override options
if options is not None:
override = options.get("override_options", False)
existing_options = delayed[3]
existing_options = delayed.options
for k in options.keys():
if k=="override_options":
continue
if override or k not in existing_options:
existing_options[k] = options[k]
damagelog("do_damage%-24s wid=%s, using existing delayed %s regions created %.1fms ago",
(x, y, w, h, options), self.wid, delayed[3], now-delayed[0])
damagelog("do_damage%-24s wid=%s, using existing %i delayed regions created %.1fms ago",
(x, y, w, h, options), self.wid, len(regions), now-delayed.damage_time)
if not self.expire_timer and not self.soft_timer and self.soft_expired==0:
log.error("Error: bug, found a delayed region without a timer!")
self.expire_timer = self.timeout_add(0, self.expire_delayed_region, 0)
self.expire_timer = self.timeout_add(0, self.expire_delayed_region)
return

delay = options.get("delay", self.batch_config.delay)
Expand Down Expand Up @@ -1288,12 +1295,13 @@ def damage_now():

#create a new delayed region:
regions = [rectangle(x, y, w, h)]
self._damage_delayed_expired = False
actual_encoding = options.get("encoding", self.encoding)
self._damage_delayed = now, regions, actual_encoding, options or {}
self._damage_delayed = DelayedRegions(now, regions, actual_encoding, options)
damagelog("do_damage%-24s wid=%s, scheduling batching expiry for sequence %s in %i ms", (x, y, w, h, options), self.wid, self._sequence, delay)
self.batch_config.last_delays.append((now, delay))
self.expire_timer = self.timeout_add(delay, self.expire_delayed_region, delay)
expire_delay = max(self.batch_config.min_delay, min(self.batch_config.expire_delay, delay))
due = now+delay
self.expire_timer = self.timeout_add(expire_delay, self.expire_delayed_region, due)

def must_batch(self, delay):
if FORCE_BATCH:
Expand Down Expand Up @@ -1338,7 +1346,7 @@ def must_batch(self, delay):
#do batch if we got more than 5 damage events in the last 10 milliseconds:
return monotonic_time()-t<0.010
except:
#probably not enough events to grab -10
#probably not enough events to grab -5
return False

def get_packets_backlog(self):
Expand All @@ -1349,30 +1357,42 @@ def get_packets_backlog(self):
latency_tolerance_pct = int(min(self._damage_packet_sequence, 10)*min(monotonic_time()-gs.last_congestion_time, 10))
return s.get_packets_backlog(latency_tolerance_pct)

def expire_delayed_region(self, delay):
def expire_delayed_region(self, due=0):
""" mark the region as expired so damage_packet_acked can send it later,
and try to send it now.
"""
self.expire_timer = None
self._damage_delayed_expired = True
delayed = self._damage_delayed
if not delayed:
#region has been sent
return False
if self.soft_timer:
#a soft timer will take care of it soon
return False
delayed.expired = True
self.cancel_may_send_timer()
self.may_send_delayed()
delayed = self._damage_delayed
if not delayed:
#got sent
return False
now = monotonic_time()
if now<due:
#not due yet, don't allow soft expiry, just try again later:
delay = int(1000*(due-now))
expire_delay = max(self.batch_config.min_delay, min(self.batch_config.expire_delay, delay))
self.expire_timer = self.timeout_add(expire_delay, self.expire_delayed_region, due)
return False
#the region has not been sent yet because we are waiting for damage ACKs from the client
max_soft_expired = min(1+self.statistics.damage_events_count//2, self.max_soft_expired)
if self.soft_expired<max_soft_expired:
#there aren't too many regions soft expired yet
#so use the "soft timer":
self.soft_expired += 1
#we have already waited for "delay" to get here, wait more as we soft expire more regions:
self.soft_timer = self.timeout_add(int(self.soft_expired*delay), self.delayed_region_soft_timeout)
#we have already waited for "expire delay" to get here,
#wait gradually more as we soft expire more regions:
soft_delay = self.soft_expired*self.batch_config.expire_delay
self.soft_timer = self.timeout_add(soft_delay, self.delayed_region_soft_timeout)
else:
if max_soft_expired==self.max_soft_expired:
#only record this congestion if this is a new event,
Expand All @@ -1382,12 +1402,13 @@ def expire_delayed_region(self, delay):
celapsed = monotonic_time()-self.global_statistics.last_congestion_time
if celapsed<10:
late_pct = 2*100*self.soft_expired
delay = now-due
self.networksend_congestion_event("soft-expire limit: %ims, %i/%i" % (delay, self.soft_expired, self.max_soft_expired), late_pct)
#NOTE: this should never happen...
#the region should now get sent when we eventually receive the pending ACKs
#but if somehow they go missing... clean it up from a timeout:
if not self.timeout_timer:
delayed_region_time = delayed[0]
delayed_region_time = delayed.damage_time
self.timeout_timer = self.timeout_add(self.batch_config.timeout_delay, self.delayed_region_timeout, delayed_region_time)
return False

Expand All @@ -1402,16 +1423,16 @@ def delayed_region_timeout(self, delayed_region_time):
if delayed is None:
#delayed region got sent
return False
region_time = delayed[0]
region_time = delayed.damage_time
if region_time!=delayed_region_time:
#this is a different region
return False
#ouch: same region!
now = monotonic_time()
options = delayed[3]
elapsed = int(1000.0 * (now - region_time))
options = delayed.options
elapsed = int(1000 * (now - region_time))
log.warn("Warning: delayed region timeout")
log.warn(" region is %i seconds old, will retry - bad connection?", elapsed/1000)
log.warn(" region is %i seconds old, will retry - bad connection?", elapsed//1000)
dap = dict(self.statistics.damage_ack_pending)
if dap:
log.warn(" %i late responses:", len(dap))
Expand Down Expand Up @@ -1442,13 +1463,17 @@ def may_send_delayed(self):
if not dd:
log("window %s delayed region already sent", self.wid)
return
damage_time = dd[0]
damage_time = dd.damage_time
packets_backlog = self.get_packets_backlog()
now = monotonic_time()
actual_delay = int(1000 * (now-damage_time))
if packets_backlog>0:
if actual_delay>self.batch_config.timeout_delay:
log.warn("send_delayed for wid %s, elapsed time %ims is above limit of %.1f", self.wid, actual_delay, self.batch_config.max_delay)
log("send_delayed for wid %s, elapsed time %ims is above limit of %.1f", self.wid, actual_delay, self.batch_config.timeout_delay)
key = ("timeout-damage-delay", self.wid, damage_time)
if first_time(key):
log.warn("Warning: timeout on screen updates for window %i,", self.wid)
log.warn(" already delayed for more than %i seconds", actual_delay//1000)
return
log("send_delayed for wid %s, delaying again because of backlog: %s packets, batch delay is %i, elapsed time is %ims",
self.wid, packets_backlog, self.batch_config.delay, actual_delay)
Expand Down Expand Up @@ -1502,14 +1527,13 @@ def do_send_delayed(self):
delayed = self._damage_delayed
if delayed:
self._damage_delayed = None
damage_time = delayed[0]
now = monotonic_time()
actual_delay = int(1000 * (now-damage_time))
actual_delay = int(1000 * (now-delayed.damage_time))
self.batch_config.last_actual_delays.append((now, actual_delay))
self.send_delayed_regions(*delayed)
self.send_delayed_regions(delayed)
return False

def send_delayed_regions(self, damage_time, regions, coding, options):
def send_delayed_regions(self, delayed_regions):
""" Called by 'send_delayed' when we expire a delayed region,
There may be many rectangles within this delayed region,
so figure out if we want to send them all or if we
Expand All @@ -1523,7 +1547,8 @@ def send_delayed_regions(self, damage_time, regions, coding, options):
self.window.acknowledge_changes()
self.batch_config.last_event = monotonic_time()
if not self.is_cancelled():
self.do_send_delayed_regions(damage_time, regions, coding, options)
dr = delayed_regions
self.do_send_delayed_regions(dr.damage_time, dr.regions, dr.encoding, dr.options)

def do_send_delayed_regions(self, damage_time, regions, coding, options, exclude_region=None, get_best_encoding=None):
ww,wh = self.window_dimensions
Expand Down Expand Up @@ -2040,15 +2065,16 @@ def damage_packet_acked(self, damage_packet_sequence, width, height, decode_time
send_speed = bytecount*8*1000//actual_send_latency
#statslog("send latency: expected up to %3i, got %3i, %6iKB sent in %3i ms: %5iKbps", latency, actual, bytecount//1024, actual_send_latency, send_speed//1024)
self.networksend_congestion_event("late-ack for sequence %6i: late by %3ims, target latency=%3i (%s)" % (damage_packet_sequence, late_by, latency, (netlatency, sendlatency, decode_time, ack_tolerance)), late_pct, send_speed)
if self._damage_delayed is not None and self._damage_delayed_expired:
damage_delayed = self._damage_delayed
if not damage_delayed:
self.soft_expired = 0
elif damage_delayed.expired:
def call_may_send_delayed():
self.cancel_may_send_timer()
self.may_send_delayed()
#this function is called from the network thread,
#call via idle_add to prevent race conditions:
self.idle_add(call_may_send_delayed)
if not self._damage_delayed:
self.soft_expired = 0

def client_decode_error(self, error, message):
#don't print error code -1, which is just a generic code for error
Expand Down
13 changes: 7 additions & 6 deletions src/xpra/server/window/window_video_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from xpra.net.compression import Compressed, LargeStructure
from xpra.codecs.codec_constants import TransientCodecException, RGB_FORMATS, PIXEL_SUBSAMPLING
from xpra.server.window.window_source import WindowSource, STRICT_MODE, AUTO_REFRESH_SPEED, AUTO_REFRESH_QUALITY, MAX_RGB
from xpra.server.window.window_source import WindowSource, DelayedRegions, STRICT_MODE, AUTO_REFRESH_SPEED, AUTO_REFRESH_QUALITY, MAX_RGB
from xpra.server.window.region import merge_all #@UnresolvedImport
from xpra.server.window.motion import ScrollData #@UnresolvedImport
from xpra.server.window.video_subregion import VideoSubregion, VIDEO_SUBREGION
Expand Down Expand Up @@ -782,8 +782,8 @@ def send_nonvideo(regions=regions, encoding=coding, exclude_region=None, get_bes
#(this codepath can fire from a video region refresh callback)
dr = self._damage_delayed
if dr:
regions = dr[1] + regions
damage_time = min(damage_time, dr[0])
regions = dr.regions + regions
damage_time = min(damage_time, dr.damage_time)
self._damage_delayed = None
self.cancel_expire_timer()
#decide if we want to send the rest now or delay some more,
Expand All @@ -794,14 +794,15 @@ def send_nonvideo(regions=regions, encoding=coding, exclude_region=None, get_bes
else:
#non-video is delayed at least 50ms, 4 times the batch delay, but no more than non_max_wait:
elapsed = int(1000.0*(monotonic_time()-damage_time))
delay = max(self.batch_config.delay*4, 50)
delay = max(self.batch_config.delay*4, self.batch_config.expire_delay)
delay = min(delay, self.video_subregion.non_max_wait-elapsed)
delay = int(delay)
if delay<=25:
send_nonvideo(regions=regions, encoding=None, exclude_region=actual_vr)
else:
self._damage_delayed = damage_time, regions, coding, options or {}
self._damage_delayed = DelayedRegions(damage_time, regions, coding, options=options)
sublog("do_send_delayed_regions: delaying non video regions %s some more by %ims", regions, delay)
self.expire_timer = self.timeout_add(int(delay), self.expire_delayed_region, delay)
self.expire_timer = self.timeout_add(delay, self.expire_delayed_region)

def must_encode_full_frame(self, encoding):
return self.full_frames_only or (encoding in self.video_encodings) or not self.non_video_encodings
Expand Down

0 comments on commit eb23061

Please sign in to comment.