-
Notifications
You must be signed in to change notification settings - Fork 2
/
replicates.py
114 lines (98 loc) · 4.45 KB
/
replicates.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import sys
import threading
from time import sleep
import parse
import zmq
import functions
class Replicas:
def __init__(self, replica_factor, replica_port, period, videos, keepers, lv, lk):
self.replica_factor = replica_factor
self.replica_port = replica_port
self.videos = videos
self.keepers = keepers
self.lv = lv
self.lk = lk
self.period = period
# dummy look-up tables...
# this look up table holds the Ip of each data keeper and their ports indicating available/ not available ports...
def get_available_port(self, ip):
r_port = -1 # keeps -1 if no port available....
for port in self.keepers[ip][0]:
if not self.keepers[ip][0][port]:
r_port = port
break
return r_port
def get_source(self, file):
for machine in self.videos[file][0].keys():
if not self.keepers[machine][-1] or not self.videos[file][0][machine]: # if not alive or not available for replication
continue
port = self.get_available_port(machine)
if port != -1:
return machine, port
return '', -1 # no source available....
def get_destination(self, file):
for ip in self.keepers.keys():
if not self.keepers[ip][-1]:
continue
if ip in self.videos[file][0].keys():
continue
port = self.get_available_port(ip)
if port != -1:
return ip, port
return '', -1 # no available destination....
def activate(self, ports):
for machine, port in ports:
functions.set_busy(self.keepers, self.lk, machine, port, False)
def deactivate(self, ports):
for machine, port in ports:
functions.set_busy(self.keepers, self.lk, machine, port, True)
def inform_replicated(self, replica_port):
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind('tcp://*:{}'.format(replica_port))
while True:
request = socket.recv_string()
parsed_req = parse.parse('{} {} {} {} {}', request)
file_name = str(parsed_req[0])
src_ip = str(parsed_req[1])
src_port = str(parsed_req[2])
dst_ip = str(parsed_req[3])
dst_port = str(parsed_req[4])
# update look_up table...
functions.finish_replicate(self.videos, self.lv, file_name, dst_ip)
socket.send_string('informed that file {} replicated in datakeeper {}'.format(file_name, dst_ip))
self.activate([(src_ip, src_port), (dst_ip, dst_port)])
def count_file_replicas(self, filename):
count = 0
for keeper_ip in self.videos[filename][0].keys():
if self.keepers[keeper_ip][-1]:
count += 1
return count
def manage_replications(self):
rep_thread = threading.Thread(target=self.inform_replicated, args=[self.replica_port])
rep_thread.start()
context = zmq.Context()
socket = context.socket(zmq.REQ)
while True:
for file in self.videos.keys():
if self.count_file_replicas(file) < self.replica_factor:
src_ip, src_port = self.get_source(str(file))
dst_ip, dst_port = self.get_destination(str(file))
if src_port == -1:
print('no source machine available...')
elif dst_port == -1:
print('no destination machine available...')
else:
socket.connect('tcp://{}:{}'.format(dst_ip, dst_port))
request = 'replica {} {} {}'.format(file, src_ip, src_port)
socket.send_string(request)
response = socket.recv_string()
functions.start_replicate(self.videos, self.lv, file, dst_ip)
self.deactivate([(src_ip, src_port), (dst_ip, dst_port)])
print(response)
socket.disconnect('tcp://{}:{}'.format(dst_ip, dst_port))
print('all files have {} replicas, yeah it is done :v'.format(self.replica_factor))
sleep(self.period)
def init_replica_process(replica_factor, replica_port, period, videos, keepers, lv, lk):
replica = Replicas(replica_factor, replica_port, period, videos, keepers, lv, lk)
replica.manage_replications()