Skip to content

Commit

Permalink
braid smith
Browse files Browse the repository at this point in the history
  • Loading branch information
AZ-X committed May 28, 2021
1 parent 2d44dfd commit 8eeaf2c
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 146 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require (
github.com/BurntSushi/toml v0.3.1
github.com/RobinUS2/golang-moving-average v1.0.0
github.com/jedisct1/dlog v0.0.0-20190909160351-692385b00b84
github.com/jedisct1/go-clocksmith v0.0.0-20190707124905-73e087c7979c
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)
Expand Down
53 changes: 25 additions & 28 deletions repique/behaviors/fswatcher_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,38 +153,35 @@ func fswatcher_init () {
panic(err)
}
go fswatcher_loop()
for {
select {
case reg := <- register:
if path0, err := filepath.Abs(reg.filename); err != nil {
for reg := range register {
if path0, err := filepath.Abs(reg.filename); err != nil {
panic("fswatcher_init failed:" + err.Error())
} else {
dir := fixLongPath(path.Dir(path0))
var fs0 fs
if info, err := os.Stat(path0); err != nil {
panic("fswatcher_init failed:" + err.Error())
} else {
dir := fixLongPath(path.Dir(path0))
var fs0 fs
if info, err := os.Stat(path0); err != nil {
fs0 = fs{filepath:path0, lastinfo:&atomic.Value{},callback:reg.callback}
fs0.lastinfo.Store(info)
}
var found bool
for _, w := range fswatcher {
if w.folder == dir {
found = true
w.files = append(w.files, fs0)
break
}
}
if !found {
path_ptr, _ := syscall.UTF16PtrFromString(dir)
if handle, err := findFirstChangeNotification(path_ptr, false, syscall.FILE_NOTIFY_CHANGE_LAST_WRITE); err != nil {
panic("fswatcher_init failed:" + err.Error())
} else {
fs0 = fs{filepath:path0, lastinfo:&atomic.Value{},callback:reg.callback}
fs0.lastinfo.Store(info)
}
var found bool
for _, w := range fswatcher {
if w.folder == dir {
found = true
w.files = append(w.files, fs0)
break
}
}
if !found {
path_ptr, _ := syscall.UTF16PtrFromString(dir)
if handle, err := findFirstChangeNotification(path_ptr, false, syscall.FILE_NOTIFY_CHANGE_LAST_WRITE); err != nil {
panic("fswatcher_init failed:" + err.Error())
} else {
w := watcher{folder:dir, files:[]fs{fs0}, win_handle:handle}
fswatcher = append(fswatcher, w)
if !setEvent(reset) {
panic("fswatcher_init failed to call SetEvent")
}
w := watcher{folder:dir, files:[]fs{fs0}, win_handle:handle}
fswatcher = append(fswatcher, w)
if !setEvent(reset) {
panic("fswatcher_init failed to call SetEvent")
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions repique/features/dns/nodes/dnscrypt.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ type dnscryptnode struct {
bs_relays []*common.Endpoint
}

func (n *dnscryptnode) boost(o *node) *uint32 {
func (n *dnscryptnode) boost(o *node) interface{} {
if o.status&status_outdated == status_outdated || n.GetDefaultExpiration().Before(time.Now()) {
relays := n.bs_relays
if _, err := dnscrypt.RetrieveServicesInfo(false, n.Resolver, n.dailFn, n.Network, n.ipaddr, &relays); err == nil {
n.bs2epring()
expired := uint32(n.GetDefaultExpiration().Unix())
n.bs2epring(relays)
expired := n.GetDefaultExpiration()
return &expired
} else {
dlog.Debugf("dnscrypt boost failed, %v", err)
Expand All @@ -37,8 +37,8 @@ func (n *dnscryptnode) boost(o *node) *uint32 {
return nil
}

func (n *dnscryptnode) bs2epring() {
if epring := common.LinkEPRing(n.bs_relays...); epring != nil {
func (n *dnscryptnode) bs2epring(eps []*common.Endpoint) {
if epring := common.LinkEPRing(eps...); epring != nil {
epring.Do(func(v interface{}){
dlog.Infof("relay [%s*%s]=%s", *n.Name, v.(*common.EPRing).Order(), v.(*common.EPRing).String())
})
Expand Down Expand Up @@ -75,7 +75,7 @@ func (n *dnscryptnode) unmarshal(ss *struct{c uint8; v string}) *time.Time {
} else {
n.V2_Services = append(n.V2_Services, s)
}
n.bs2epring()
n.bs2epring(n.bs_relays)
to := time.Unix(int64(s.DtTo), 0)
return &to
}
Expand Down
3 changes: 2 additions & 1 deletion repique/features/dns/nodes/materials.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,13 @@ func (m *materials) open(path string, identity []byte) {
}
}

func (m *materials) unmarshalto(items []marshalable) (updated []marshalable) {
func (m *materials) unmarshalto(items []marshalable) (updated []marshalable, dts []*time.Time) {
for _, item := range items {
if s, found := m.values[*item.name()]; found {
if material := item.material(); material != nil {
if dt := material.unmarshal(s); dt == nil || dt.After(time.Now()) {
updated = append(updated, item)
dts = append(dts, dt)
}
}
}
Expand Down
102 changes: 68 additions & 34 deletions repique/features/dns/nodes/node_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ import (
"github.com/AZ-X/pique/repique/features/dns/channels"
"github.com/AZ-X/pique/repique/features/dns/nodes/metrics"
"github.com/AZ-X/pique/repique/services"

stamps "github.com/AZ-X/pique/repique/unclassified/stammel"
smith "github.com/jedisct1/go-clocksmith"
"github.com/jedisct1/dlog"

"github.com/jedisct1/dlog"
"github.com/AZ-X/dns"
)

Expand All @@ -52,7 +50,7 @@ const (
)

type connectivity interface {
boost(*node) *uint32
boost(*node) interface{} // *uint32 or *time.Time
}

type _DNSInterface interface {
Expand Down Expand Up @@ -101,10 +99,9 @@ func (n *node) awaitresolution() bool {
}

func (n *node) awaitboost() bool {
return n.status&(
return n.status&status_broken == 0 && n.status&(
status_outdated|
status_broken |
status_bootstrapping) == status_outdated|status_bootstrapping
status_bootstrapping) != 0
}

func (n *node) dnssec() bool {
Expand Down Expand Up @@ -168,10 +165,22 @@ func (n *node) evaluate() {
}
}

type boostrunnable struct {
n *node
mgr *NodesMgr
}

func (r *boostrunnable) run() {
dlog.Debugf("reboost %s", *r.n.name())
r.n.status|=status_outdated
r.mgr.fetchmaterials(r.n.name())
}

// A simple node manager across all servers
type NodesMgr struct {
*conceptions.SemaGroup
*materials
*smith
metrics.RTT
nodes map[string]*node
q2nodesFunc *[]func(*string, uint8)[]_DNSService
Expand Down Expand Up @@ -352,7 +361,7 @@ func (mgr *NodesMgr) Init(cfg *Config, routes *AnonymizedDNSConfig, sum []byte,
}
newDnscryptNode := func(svr *common.RegisteredServer) (node *dnscryptnode, c node_capable) {
hasDnscrypt = true
c = status_bootstrapping|status_outdated
c = status_outdated
if cfg.DefaultUnavailable {
c|=status_unusable
}
Expand Down Expand Up @@ -412,6 +421,7 @@ func (mgr *NodesMgr) Init(cfg *Config, routes *AnonymizedDNSConfig, sum []byte,
if hasDnscrypt {
dlog.Noticef("dnscrypt-protocol bind to %s", network2)
}
mgr.smith = &smith{}
if len(cfg.ExportCredentialPath) > 0 {
mgr.materials = &materials{}
mgr.open(cfg.ExportCredentialPath, sum)
Expand All @@ -423,9 +433,24 @@ func (mgr *NodesMgr) Init(cfg *Config, routes *AnonymizedDNSConfig, sum []byte,
nodes = append(nodes, n)
}
}
for _, n := range mgr.unmarshalto(nodes) {
updates, dts := mgr.unmarshalto(nodes)
for i, n := range updates {
dlog.Debugf("exported material to %s", *n.name())
n.(*node).status&^=status_outdated|status_bootstrapping
node := n.(*node)
node.status&^=status_outdated|status_bootstrapping
f := func(){
node.status|=status_outdated
}
if cfg.FetchInterval > 0 {
r := &boostrunnable{node, mgr,}
f = r.run
}
if dt := dts[i]; dt != nil {
dlog.Debugf("next expiration for %s on %v", *n.name(), dt)
mgr.addevent(dt, 0, f)
} else if cfg.FetchInterval > 0 {
mgr.addevent(nil, uint32(cfg.FetchInterval)*60 - 5, f)
}
}
}
}
Expand All @@ -434,16 +459,19 @@ func (mgr *NodesMgr) Init(cfg *Config, routes *AnonymizedDNSConfig, sum []byte,
go func(interval time.Duration, least2 bool) {
<-mgr.Ready
close(mgr.Ready)
for {
delay := interval
var f func()
f = func () {
mgr.fetchmaterials()
delay := interval
lives, total := mgr.available()
if least2 && lives <= 1 && total != lives {
delay = 100 * time.Millisecond * time.Duration(total - lives)
delay = time.Duration(total - lives) * time.Second
}
debug.FreeOSMemory()
smith.Sleep(delay)
mgr.addevent(nil, uint32(delay.Seconds()), f)
}
f()
mgr.pilot()
}(time.Duration(common.Max(60, cfg.FetchInterval)) * time.Minute, cfg.FetchAtLeastTwo)
}
}
Expand All @@ -461,7 +489,7 @@ func (mgr *NodesMgr) available() (c int, t int) {

const _DNSRoot = "."
// booster for DoH & DoT
func (mgr *NodesMgr) boost(n *node) *uint32 {
func (mgr *NodesMgr) boost(n *node) interface{} {
var node *tlsnode
var bs_ips *tls_bs_ips
switch n.proto() {
Expand Down Expand Up @@ -522,34 +550,41 @@ func (mgr *NodesMgr) boost(n *node) *uint32 {
return ttl
}

func (mgr *NodesMgr) fetchmaterials() {
if !mgr.BeginExclusive() {
dlog.Warn("semi-refresh occurs")
return
func (mgr *NodesMgr) fetchmaterials(opts ...*string) {
if len(opts) == 0 {
if !mgr.BeginExclusive() {
dlog.Warn("semi-refresh occurs")
return
}
mgr.proveResolution()
mgr.associate()
mgr.EndExclusive()
}
mgr.proveResolution()
mgr.associate()
mgr.EndExclusive()

nodes, rts := make([]*node, 0), make([]chan *uint32, 0)
for _, n := range mgr.nodes {
if n.awaitboost() {
nodes, rts := make([]*node, 0), make([]chan interface{}, 0)
for key, n := range mgr.nodes {
if (len(opts) == 0 || key == *opts[0]) && n.awaitboost() {
nodes = append(nodes, n)
rt := make(chan *uint32)
rt := make(chan interface{})
rts = append(rts, rt)
go func(n1 *node, r chan<- *uint32) {
go func(n1 *node, r chan<- interface{}) {
dlog.Debugf("ready to boost %s", *n1.name())
r <- n1.boost(n1)
close(r)
}(n, rt)
}
}
updates, rtdt := make([]*node, 0), make([]*uint32, 0)
updates := make([]*node, 0)
for c := len(rts) -1; c >= 0; c-- {
rt := <- rts[c]
if rt != nil {
updates = append(updates, nodes[c])
rtdt = append(rtdt, rt)
r := &boostrunnable{nodes[c], mgr,}
if dt, ok := rt.(*time.Time); ok {
mgr.addevent(dt, 0, r.run)
} else {
mgr.addevent(nil, *rt.(*uint32) + uint32(c), r.run)
}
} else {
dlog.Debugf("can not boost %s", *nodes[c].name())
}
Expand All @@ -572,7 +607,9 @@ func (mgr *NodesMgr) fetchmaterials() {
mgr.savepoint()
}
}
mgr.proveResolution()
if len(opts) == 0 {
mgr.proveResolution()
}
mgr.associate()
}

Expand Down Expand Up @@ -698,10 +735,7 @@ func (mgr *NodesMgr) associate() {

func (mgr *NodesMgr) pick(s *channels.Session) _DNSService {
if s.ServerName != nil && *s.ServerName != channels.NonSvrName {
if node := mgr.nodes[*s.ServerName]; node.applicable() {
return node._DNSService
}
return nil
return mgr.nodes[*s.ServerName]._DNSService
}

var candidates []_DNSService
Expand Down
Loading

0 comments on commit 8eeaf2c

Please sign in to comment.