-
Notifications
You must be signed in to change notification settings - Fork 1
/
20080924a.py
136 lines (122 loc) · 4.85 KB
/
20080924a.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""Sample estimated branch lengths using three different information sources.
The first method observes all of the changes
along the branch (infinite sites model).
For the second method multiple changes
are observed as a single change (infinite alleles model).
For the third method multiple changes
are observed as a single change unless they are reverted,
in which case they are observed as no change (Jukes-Cantor model).
"""
from StringIO import StringIO
import math
import random
import numpy as np
from SnippetUtil import HandlingError
import RUtil
import Form
import FormOut
def get_form():
"""
@return: the body of a form
"""
form_objects = [
Form.Float('branch_length', 'branch length',
1.0, low_exclusive=0.0),
Form.Integer('sequence_length', 'sequence length',
1000, low=1)]
return form_objects
def get_form_out():
return FormOut.Report()
def get_response_content(fs):
# get the branch length and the sequence length
branch_length = fs.branch_length
sequence_length = fs.sequence_length
# sample sequence changes at three levels of informativeness
sequence_changes = sample_sequence_changes(branch_length, sequence_length)
# get a distance estimate for each level of informativeness
estimate_a, estimate_b, estimate_c = sample_distance(*sequence_changes)
# begin the response
out = StringIO()
print >> out, 'distance estimates:'
print >> out, 'using all change information:', estimate_a
print >> out, 'without multiple change information:', estimate_b
print >> out, 'without reverted change information:', estimate_c
# return the response
return out.getvalue()
def sample_distance(mean_changes, p_observed, p_nonreverted):
"""
Get a distance esimate for each of three data sources.
@param mean_changes: the
@return: a triple of distance estimates
"""
# when each change is observed, the estimate is the mean number of changes
first_estimate = mean_changes
# when multiple changes are hidden, the distance can still be estimated
if p_observed == 1.0:
second_estimate = float('inf')
else:
second_estimate = -math.log(1 - p_observed)
# when changes can be reverted, the distance can still be estimated
if p_nonreverted >= 0.75:
third_estimate = float('inf')
else:
third_estimate = - .75 * math.log(1 - (4.0/3.0) * p_nonreverted)
return first_estimate, second_estimate, third_estimate
def sample_sequence_changes(branch_length, nsites):
"""
Return three values.
First, the mean number of changes.
Second, the change frequence,
Third, the non-reverted change frequency
The branch length is the expected number of changes
along the branch at each site
@param branch_length: the expected number of changes along the branch
@param nsites: the number of sites in the sequence
@return: the triple of results
"""
samples = [sample_site_changes(branch_length) for i in range(nsites)]
change_counts, observed_changes, nonreverted_changes = zip(*samples)
mean_changes = sum(change_counts) / float(nsites)
p_observed = sum(observed_changes) / float(nsites)
p_nonreverted = sum(nonreverted_changes) / float(nsites)
return mean_changes, p_observed, p_nonreverted
def sample_site_changes(branch_length):
"""
Return three values.
First, the change count.
Second, 1 if any changes.
Third, 1 if non-reverted changes.
@param branch_length: the expected number of changes along the branch
@return: the triple of results
"""
# initialize the random choice table
choice_from = ((1, 2, 3), (0, 2, 3), (0, 1, 3), (0, 1, 2))
# the number of changes is poisson distributed
change_count = np.random.poisson(branch_length)
# note whether or not a (possibly reverted) change was observed
observed_change = min(change_count, 1)
# simulate the changes
state = 0
for i in range(change_count):
state = random.choice(choice_from[state])
# note whether a nonreverted change was observed
nonreverted_change = min(state, 1)
return change_count, observed_change, nonreverted_change
def hard_coded_analysis():
branch_length = 5.0
sequence_length = 1000
nsequences = 1000
estimate_triple_list = []
column_headers = ('most.info', 'less.info', 'least.info')
for i in range(nsequences):
# sample sequence changes at three levels of informativeness
sequence_changes = sample_sequence_changes(
branch_length, sequence_length)
# get a distance estimate for each level of informativeness
estimate_triple = sample_distance(*sequence_changes)
estimate_triple_list.append(estimate_triple)
print RUtil.get_table_string(estimate_triple_list, column_headers)
def main():
hard_coded_analysis()
if __name__ == '__main__':
main()