-
Notifications
You must be signed in to change notification settings - Fork 1
/
genbank.py
1107 lines (934 loc) · 41.7 KB
/
genbank.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
### * Description
# Module providing functions and command line scripts to retrieve and process
# data from GenBank
SCRIPT_DESCRIPTION_SEARCH = (""
"Perform a search on GenBank, retrieve document summaries and download full "
"GenBank records.")
SCRIPT_DESCRIPTION_EXTRACT_CDS = (""
"Extract CDS information from GenBank records and produce a summary table. "
"Can also produce a hash for each CDS and output the unique sequences in "
"fasta format. If unique sequences in fasta format are required, use the -u "
"option.")
### * Setup
### ** Import
import os
import time
import argparse
import sys
import xml.etree.ElementTree as ET
import StringIO
import hashlib
import urllib2
import gzip
from Bio import Entrez
from Bio import SeqIO
import pyprind
### ** Default variables
_GB_RECORD_FMTDICT = {
"id" : lambda x: x.id,
"name" : lambda x: x.name,
"dscrp" : lambda x: x.description,
"acc" : lambda x: ",".join(x.annotations["accessions"]),
"date" : lambda x: x.annotations["date"],
"gi" : lambda x: x.annotations["gi"],
"orgn" : lambda x: x.annotations["organism"],
"src" : lambda x: x.annotations["source"]
}
_GB_CDS_FMTDICT = {
"loc" : lambda x: str(x.location),
"prot" : lambda x: ",".join(x.qualifiers.get("translation", ["NA"])),
"nuc": lambda x: str(x.extract(x.parentRecord).seq),
"prod" : lambda x: ",".join(x.qualifiers.get("product", ["NA"])),
"gene" : lambda x: ",".join(x.qualifiers.get("gene", ["NA"])),
"hash" : lambda x: x
}
GET_ATTR_FUNCS = {
# Those are lambda functions taking
# (c, r, h) = (CDS feature, GB record, seq hash)
# Record attributes
"id" : lambda c,r,h: r.id,
"name" : lambda c,r,h: r.name,
"dscrp" : lambda c,r,h: r.description,
"acc" : lambda c,r,h: ",".join(r.annotations["accessions"]),
"date" : lambda c,r,h: r.annotations["date"],
"gi" : lambda c,r,h: r.annotations["gi"],
"orgn" : lambda c,r,h: r.annotations["organism"],
"src" : lambda c,r,h: r.annotations["source"],
# CDS attributes
"loc" : lambda c,r,h: str(c.location),
"prot" : lambda c,r,h: ",".join(c.qualifiers.get("translation", ["NA"])),
"nuc": lambda c,r,h: str(c.extract(r).seq),
"prod" : lambda c,r,h: ",".join(c.qualifiers.get("product", ["NA"])),
"gene" : lambda c,r,h: ",".join(c.qualifiers.get("gene", ["NA"])),
"hash" : lambda c,r,h: h
}
### * Functions
### ** Related to main_search
### *** search(term, retmax)
def search(term, retmax = None) :
"""Search GenBank for a given query string. Perform a Bio.Entrez.esearch on
db="nuccore", using history.
Args:
term (str): Query to submit to GenBank
retmax (int): Maximum number of returned ids
Returns
Bio.Entrez.Parser.DictionaryElement: The result of the search, with
detailed information accessible as if it was a Python dictionary. Keys
are::
"Count", "RetMax", "IdList", "TranslationStack", "QueryTranslation",
"TranslationSet", "RetStart", "QueryKey", "WebEnv"
This object can be used with other function of this module to get the
actual data (:func:`getDocSum` and :func:`_getFullRecord`)
"""
handle = Entrez.esearch(db = "nuccore", term = term, retmax =retmax,
usehistory = "y")
results = Entrez.read(handle)
handle.close()
return results
### *** getDocSum(searchResult, retmax = None)
def getDocSum(searchResult, retmax = None) :
"""Fetch the documents summaries for the entries from an Entrez.esearch.
Args:
searchResult (Bio.Entrez.Parser.DictionaryElement): Object containing
the result of an Entrez.esearch, typically the output from
:func: `search` (or at minimum a dictionary with WebEnv
and QueryKey entries)
retmax (int): Maximum number of document summaries to get. If no number
is given, uses the `RetMax` element from `searchResult`.
Returns:
list of dictionaries: A list of dictionaries containing the document
summaries
"""
if retmax is None:
retmax = searchResult["RetMax"]
docSumsXML = _getDocSumXML(searchResult = searchResult,
retmax = retmax)
docSums = _parseDocSumXML(xmlContent = docSumsXML)
return docSums
### *** getDocSumFromId(listId, retmax = None)
def getDocSumFromId(listId, retmax = None) :
"""Fetch the documents summaries from a list of GenBank identifiers.
Args:
listId (list of str): A list of GenBank identifiers
retmax (int): Maximum number of document summaries to get. If None
(default), returns all the document summaries.
Returns:
list of dictionaries: A list of dictionaries containing the document
summaries
"""
if retmax is None :
retmax = len(listId)
# Epost modified fromt the Biopython cookbook
mySearch = Entrez.read(Entrez.epost(db = "nuccore", id = ",".join(listId)))
docSumsXML = _getDocSumXML(searchResult = mySearch,
retmax = retmax)
docSums = _parseDocSumXML(xmlContent = docSumsXML)
return docSums
### *** _getDocSumXML(searchResult, retmax = None)
def _getDocSumXML(searchResult, retmax = None) :
"""Fetch the documents summaries in XML format for the entries from an
Entrez.esearch.
Args:
searchResult (Bio.Entrez.Parser.DictionaryElement): Object containing
the result of an Entrez.esearch, typically the output from
:func:`search` (or at minimum a dictionary with WebEnv
and QueryKey entries)
retmax (int): Maximum number of document summaries to get. If no number
is given, uses the `RetMax` element from `searchResult`.
Returns:
str: A string containing the summaries in XML format
"""
if retmax is None:
retmax = searchResult["RetMax"]
handle = Entrez.efetch(db = "nuccore", rettype = "docsum", retmode = "xml",
retstart = 0, retmax = retmax,
webenv = searchResult["WebEnv"],
query_key = searchResult["QueryKey"])
data = handle.read()
handle.close()
return data
### *** _getFullRecord(searchResult, retmax)
def _getFullRecord(searchResult, retmax = 1) :
"""Fetch the full GenBank records for the entries from an Entrez.esearch.
Args:
searchResult (Bio.Entrez.Parser.DictionaryElement): Object containing
the result of an Entrez.esearch, typically the output from
:func:`search` (or at minimum a dictionary with WebEnv
and QueryKey entries).
retmax (int): Maximum number of full records to get. Default is 1,
which is a safe approach since individual records can sometimes
be very large (e.g. chromosomes).
Returns:
str: A string containing the full records in XML format
"""
handle = Entrez.efetch(db = "nuccore", rettype = "gb", retmode = "xml",
retstart = 0, retmax = retmax,
webenv = searchResult["WebEnv"],
query_key = searchResult["QueryKey"])
data = handle.read()
handle.close()
return data
### *** _parseDocSumXML(xmlContent)
def _parseDocSumXML(xmlContent) :
"""Parse the documents summaries from xml format into a list of
dictionaries.
Args:
xmlContent (string): Document summaries in XML format (note: this is a
string, not a file name). This is typically the output from
:func:`_getDocSumXML`.
Returns:
list of dictionaries: A list of dictionaries containing the
document summaries, or an empty list if no entry was found.
"""
xmlFile = StringIO.StringIO(xmlContent)
if "<ERROR>Empty result - nothing to do</ERROR>" in xmlFile.getvalue() :
return []
tree = ET.parse(xmlFile)
root = tree.getroot()
docsums = []
for child in root :
entry = dict()
nKeys = 0
for i in child :
if i.tag == "Item" :
if i.text == None :
i.text = "None"
entry[i.attrib["Name"]] = i.text
nKeys += 1
assert nKeys == 12
docsums.append(entry)
return docsums
### *** writeDocSums(docsums, handle)
def writeDocSums(docsums, handle) :
"""Write the documents summaries into a tabular format to any handle with
a `write` method.
Args:
docsums (list of dictionaries or None): A list of dictionaries
containing the document summaries, typically the output from
:func:`parseDocSumXML`. If it is an empty list, the
function will not write anything.
handle (similar to file handle): Handle object with a `write` method
(e.g. open file, sys.stdout, StringIO object)
Returns:
Nothing but writes the document summaries in a tabular format to the
specified file.
"""
if docsums == [] :
return None
headers = docsums[0].keys()
handle.write("#" + "\t".join(headers) + "\n")
for i in docsums :
handle.write("\t".join([i[h] for h in headers]) + "\n")
### *** downloadRecords(idList, destDir, batchSize, delay)
def downloadRecords(idList, destDir, batchSize, delay = 30,
forceDownload = False,
downloadFullWGS = False) :
"""Download the GenBank records for a list of IDs and save them in a
destination folder. This is the function to use to download data from
GenBank. It applies a waiting delay between each batch download. Each
record is saved as a file with name id + ".gb".
Note that a record is not downloaded if a file with the expected name
already exists, except if `forceDownload` is `True`.
The downloading itself is performed by :func:`_downloadBatch` and
:func:`_getRecordBatch`.
some GenBank records do not contain actual sequence data but some reference
to a WGS (whole genome shotgun sequencing) project. For those, setting
`downloadFullWGS` to True is necessary to download another GenBank file
with the actual sequence data. Note that if a GenBank record was first
downloaded without this option, and actually contains a WGS reference, then
the `forceDownload` option must be enabled (or the file must be removed)
for the WGS file to be also downloaded in a new call of this function.
Args:
idList (list of str): List of GenBank id
destDir (str): Path to the folder where the records will be saved
forceDownload (boolean): Should records for which a destination file
already exists be downloaded anyway? (default: `False`)
downloadFullWGS (boolean): If True, also download the full GenBank
files corresponding to GenBank records with WGS trace reference
Returns:
Nothing, but saves the GenBank records in the destination folder
specified by `destDir`.
"""
# Create the destination directory if it doesn't exist
if not os.path.isdir(destDir) :
os.makedirs(destDir)
# Filter the list for only records not already downloaded
existingFileList = os.listdir(destDir)
if forceDownload :
newIdList = idList
else :
newIdList = [x for x in idList if not ((x + ".gb") in existingFileList)]
# Download the batches
bar = pyprind.ProgBar(len(newIdList) / batchSize, monitor=True,
title='Downloading the batches')
for i in range(0, len(newIdList), batchSize):
end = min(len(newIdList), i + batchSize)
batch = newIdList[i: end]
_downloadBatch(batch, destDir, downloadFullWGS)
time.sleep(delay)
bar.update()
### *** _downloadBatch(idBatch, destDir, downloadFullWGS = False)
def _downloadBatch(idBatch, destDir, downloadFullWGS = False) :
"""Download a batch of GenBank records to a destination directory. You should
not call this function directly, but rather use
:func:`downloadRecords` (which itself calls
:func:`_downloadBatch`) for your downloads.
:func:`_downloadBatch` calls :func:`_getRecordBatch` to
download data from GenBank, and then takes care of separating individual
records and writing them to files.
Args:
idBatch (list of str): List of GenBank id
destDir (str): Path to the folder where the records will be saved
downloadFullWGS (boolean): If True, also download the full GenBank
files corresponding to GenBank records with WGS trace reference
Returns:
Nothing, but saves the GenBank records in the destination folder
specified by `destDir`.
"""
# Download the records
r = _getRecordBatch(idBatch)
# Split the downloaded strings into records
r = r.strip().split("\n//")
# print(r)
assert r[-1] == ""
r = r[0: -1]
assert len(r) == len(idBatch)
# Save the records
for i in range(len(idBatch)) :
with open(os.path.join(destDir, idBatch[i] + ".gb"), "w") as fo :
fo.write(r[i].strip())
fo.write("\n//\n")
WGSline = _recordIsWGS(r[i])
if WGSline and downloadFullWGS :
downloadWGS(r[i], destDir)
### *** _recordIsWGS(recordStr)
def _recordIsWGS(recordStr) :
"""Check if a GenBank record is from a whole genome shotgun project. This
is done by searching for the "WGS " string at the beginning of a line
Args:
recordStr (str): Content of a GenBank record
Returns:
str or False: WGS line or False
"""
lines = recordStr.split("\n")
WGS_lines = [x for x in lines if x.startswith("WGS ")]
if len(WGS_lines) == 1 :
return WGS_lines[0]
elif len(WGS_lines) == 0 :
return False
else :
raise Exception("Several lines starting with \"WGS \" in a GenBank record")
### *** _makeWGSurl(WGSline)
def _makeWGSurl(WGSline) :
"""Prepare the url to download the GenBank records corresponding to one
WGS line. The WGS line is the output from
:func:`_genbankRecirdIsWGS`.
Args:
WGSline (str): WGS line from a GenBank record, output from
:func:`_genbankRecirdIsWGS`.
Returns:
str: Url to download the record
"""
if not WGSline.startswith("WGS ") :
raise Exception("Line does not start with \"WGS \"")
accession = WGSline.split(" ")[-1]
accRoot = accession.split("-")[0][0:6]
url = "http://www.ncbi.nlm.nih.gov/Traces/wgs/?download=" + accRoot + ".1.gbff.gz"
return url
### *** _downloadWGS(WGSurl)
def _downloadWGS(WGSurl) :
"""Download a WGS GenBank file. The output is an uncompressed version of the
file.
Args:
WGSurl (str): Url to download the gzip file, output from
:func:`_makeWGSurl`
Returns:
str: Uncompressed GenBank file content. Return None if there was a
problem during gzip decompression.
"""
gzipContent = urllib2.urlopen(WGSurl).read()
gzipFile = StringIO.StringIO(gzipContent)
o = gzip.GzipFile(fileobj = gzipFile)
output = None
try :
output = o.read()
except IOError as e:
print(e)
o.close()
return output
### *** downloadWGS(gbRecord, destDir)
def downloadWGS(gbRecord, destDir) :
"""Download and save the WGS GenBank file corresponding to a GenBank
record with a WGS reference
Args:
gbRecord (str): Text content of a GenBank record with WGS reference
destDir (str): Path to the directory where to save the GenBank file
Returns:
Nothing, but save the complete GenBank file corresponding to the WGS
reference into the specified folder. The file name is the GI number
plus "WGS"
"""
WGS = _recordIsWGS(gbRecord)
assert WGS
url = _makeWGSurl(WGS)
WGS_content = _downloadWGS(url)
gb = SeqIO.read(StringIO.StringIO(gbRecord + "\n//"), "genbank")
gi = gb.annotations["gi"]
filePath = os.path.join(destDir, gi + "_WGS.gb")
with open(filePath, "w") as fo :
fo.write(WGS_content)
return
### *** _getRecordBatch(idList)
def _getRecordBatch(idList) :
"""Retrieve the GenBank records for a list of GenBank id (GIs). This is a
relatively low-level function that only gets data from GenBank but does not
manage batches or write files. You should use the higher level wrapper
:func:`downloadRecords` for your own downloads.
Args:
idBatch (list of str): List of GenBank id
Returns:
str: A string with all the data (can be large if many id are provided)
"""
handle = Entrez.efetch(db = "nuccore", rettype = "gbwithparts",
retmode = "text", id = ",".join(idList))
r = handle.read()
handle.close()
return r
### *** _fileLinesToList(filename)
def _fileLinesToList(filename) :
"""Simple function to get a list of stripped lines from a file.
Args:
filename (str): Path to the file to read
Returns:
list of str: A list containing all non-empty white-stripped lines from
the file
"""
o = []
with open(filename, "r") as fi :
for l in fi :
if l.strip() != "" :
o.append(l.strip())
return o
### ** Related to main_extract_CDS
### *** _summarizeRecord(record, summaryFormat, hashConstructor, existingHashes)
def _summarizeRecord(record, summaryFormat, hashConstructor,
existingHashes = dict()):
"""Produce a tabular summary of all the CDS features present in a GenBank
record and a dictionary containing hashes of the unique sequences (hash,
protein sequence). If a dictionary of pre-existing hashes is given, update
this one. Checks for collisions in the hash dictionary.
Args:
record (Bio.SeqRecord.SeqRecord): GenBank record (Biopython object)
summaryFormat (list of str): List of attribute descriptors determining
the columns of the summary table
hashConstructor (function): Hash algorithm to be used (from the
``hashlib`` module)
existingHashes (dict): Dictionary (k, v) = (hash, protein sequences).
This is updated with the CDS hashes from the input `record` and
checked for collisions.
Returns:
(str, dict, dict): A tuple containing the following objects
* string: tabular summary for all CDS features in the GenBank record,
ready to be written to an output stream
* dictionary (hash, protein sequences): dictionary given in input as
`existingHashes` and updated with the hashes from `record`. This is
the dictionary one can pass to another call to
:func:`_summarizeRecord` in order to progressively build a
complete dictionary of all hashes for several GenBank records.
* dictionary (hash, protein sequences): dictionary containing only
the new hashes not already present in `existingHashes`. This is
useful if one wants to update an output stream with the unique
hashes after each call to :func:`_summarizeRecord` when
processing several records.
"""
list_CDS = [x for x in record.features if x.type == "CDS"]
summary = ""
newHashes = dict()
currentHashes = existingHashes.copy()
currentHashes_keys = set(currentHashes.keys())
for CDS in list_CDS :
(protSeq, h) = _getProteinHashFromCDS(CDS, hashConstructor)
summary += (_makeSummaryForCDS(record, CDS, h, summaryFormat) +
"\n")
if h in currentHashes_keys :
assert protSeq == currentHashes[h]
else :
newHashes[h] = protSeq
currentHashes[h] = protSeq
currentHashes_keys.add(h)
return (summary, currentHashes, newHashes)
### *** _getProteinHashFromCDS(CDS, hashConstructor)
def _getProteinHashFromCDS(CDS, hashConstructor) :
"""Extract the protein sequence from a Bio.SeqFeature.SeqFeature CDS object and
determine its hash value.
Args:
CDS (Bio.SeqFeature.SeqFeature): CDS of interest
hashConstructor (function): Hash algorithm to be used (from the
``hashlib`` module)
Returns:
(str, str): A tuple containing the protein sequence and the hash value.
If there is no translation available in the CDS qualifiers, returns
"NA" as the protein sequence. If there are several translation
available, join them with commas.
"""
protSeq = ",".join(CDS.qualifiers.get("translation", ["NA"]))
h = hashConstructor()
h.update(protSeq)
hStr = h.hexdigest()
return (protSeq, hStr)
### *** _makeSummaryForCDS(record, CDS, hStr, summaryFormat)
def _makeSummaryForCDS(record, CDS, hStr, summaryFormat, getAttrFuncs = None) :
"""Make a summary for one CDS feature object.
Args:
record (Bio.SeqRecord.SeqRecord): Parent record
CDS (Bio.SeqFeature.SeqFeature): CDS of interest
hStr (str): Hash string for the protein sequence of the CDS
summaryFormat (list of str): List of attribute descriptors determining
the columns of the summary table
getAttrFuncs (dict of functions): Dictionary mapping the attribute
descriptors to functions of the form: `f(CDS, record, hStr)`. If None
(default), use the module-defined GET_ATTR_FUNCS dictionary.
Returns:
str: Summary string for this CDS
"""
if getAttrFuncs is None :
getAttrFuncs = GET_ATTR_FUNCS
summaryElements = [getAttrFuncs[x](CDS, record, hStr) for x in summaryFormat]
return "\t".join(summaryElements)
### ** Not related to a main function
### *** _recordInfo(record, outfmt, fmtdict)
def _recordInfo(record, outfmt, fmtdict = None) :
"""Get some information about a GenBank record (stored as a
Bio.SeqRecord.SeqRecord object).
Args:
record (Bio.SeqRecord.SeqRecord): GenBank record
outfmt (list of str): List of information keys
fmtdict (dict): Dictionary mapping the information keys to simple
functions to retrieve the corresponding information. If None, use
the default _GB_RECORD_FMTDICT
Returns:
List: List containing the information corresponding to each key in
`outfmt`
"""
if fmtdict is None :
fmtdict = _GB_RECORD_FMTDICT
return [fmtdict[x](record) for x in outfmt]
### *** _CDSinfo(CDS, outfmt, fmtdict)
def _CDSinfo(CDS, outfmt, fmtdictCDS = None,
fmtdictRecord = None,
parentRecord = None, hashConstructor = None) :
"""Get some information about a GenBank CDS (stored as a
Bio.SeqFeature.SeqFeature object of type "CDS").
Args:
CDS (Bio.SeqFeature.SeqFeature of type "CDS"): GenBank CDS
outfmt (list of str): List of information keys
fmtdictCDS (dict): Dictionary mapping the information keys to simple
functions to retrieve the corresponding information (for CDS
attributes). If None, default is _GB_CDS_FMTDICT.
fmtdictRecord (dict): Dictionary mapping the information keys to simple
functions to retrieve the corresponding information (for record
attributes). If None, default is _GB_RECORD_FMTDICT.
parentRecord (Bio.SeqRecord.SeqRecord): Parent record, needed to extract
nucleotide sequence and other record-related information
hashConstructor (function): Hash algorithm to be used (from the
``hashlib`` module)
Returns:
dict: Dictionary containing the information corresponding to each key
in `outfmt`
"""
if fmtdictCDS is None :
fmtdictCDS = _GB_CDS_FMTDICT
if fmtdictRecord is None :
fmtdictRecord = _GB_RECORD_FMTDICT
CDS.parentRecord = parentRecord
info = dict()
for k in [x for x in outfmt if x in fmtdictCDS.keys()] :
info[k] = fmtdictCDS[k](CDS)
for k in [x for x in outfmt if x in fmtdictRecord.keys()] :
info[k] = fmtdictRecord[k](parentRecord)
if hashConstructor is not None :
protSeq = fmtdictCDS["prot"](CDS)
h = hashConstructor()
h.update(protSeq)
info["hash"] = h.hexdigest()
return info
### * Main scripts functions
### ** pygenbank-search
# Wishlist
#
# pygenbank-search --query "blabla" >> docsum.table
# pygenbank-search --query "blabla" --download
# pygenbank-search --idlist myId.list >> docsum.table
# pygenbank-search --idlist myId.list --download
# pygenbank-search --idlist myId.list --download --outputDir ./GenBank
### *** _main_search(args = None, stdout = None, stderr = None)
def _main_search(args = None, stdout = None, stderr = None) :
"""Main function, used by the command line script "-search". This function
sends the arguments to :func:`_processArgsToLogic_search` to determine
which actions must be performed, and then performs the actions.
Args:
args (namespace): Namespace with script arguments, parse the command
line arguments if None
stdout (file): Writable stdout stream (if None, use `sys.stdout`)
stderr (file): Writable stderr stream (if None, use `sys.stderr`)
Returns:
None
"""
if stdout is None :
stdout = sys.stdout
if stderr is None :
stderr = sys.stderr
# Process arguments
if args is None :
parser = _makeParser_search()
args = parser.parse_args()
args = _processArgsToLogic_search(args, stdout, stderr)
listId = None
# Genbank search
if args.actionFlags.get("DoGenbankSearch", False) :
mySearch = search(term = args.query, retmax = args.retmax)
if args.count :
stdout.write(mySearch["QueryTranslation"] + "\t" + str(mySearch["Count"]) + "\n")
sys.exit(0)
myDocSums = getDocSum(mySearch)
writeDocSums(myDocSums, stdout)
listId = [x["Gi"] for x in myDocSums]
# Get docsums for a list of identifiers
if args.actionFlags.get("DoGetList", False) :
if args.count :
stderr.write("-l and -c cannot be used at the same time\n")
sys.exit(1)
listId = _fileLinesToList(args.listId)
myDocSums = getDocSumFromId(listId)
writeDocSums(myDocSums, stdout)
# Download records
if args.download and not args.count :
assert listId is not None
downloadRecords(idList = listId, destDir = args.outputDir,
batchSize = args.batchSize, delay = args.delay,
forceDownload = args.forceDownload,
downloadFullWGS = args.fullWGS)
### *** _makeParser_search()
def _makeParser_search() :
"""Build the argument parser for the main script "-search".
Returns:
argparse.ArgumentParser() object: An argument parser object ready to be
used to parse the command line arguments
"""
parser = argparse.ArgumentParser(
description = SCRIPT_DESCRIPTION_SEARCH)
parser.add_argument("-c", "--count", action = "store_true",
help = "Just return the number of records, no fetch")
# Required named arguments (http://stackoverflow.com/questions/24180527/argparse-required-arguments-listed-under-optional-arguments)
required = parser.add_argument_group("required named arguments")
# --email
required.add_argument("-e", "--email", type = str,
help = "User's email (required by Entrez)")
# --listId
required.add_argument("-l", "--listId", type = str,
help = "File containing one GenBank identifier per "
"line. Use - for reading from stdin. "
"Exactly one of --listId or "
"--query must be specified, but not both.")
# --query
required.add_argument("-q", "--query", type = str,
help = "Query string for GenBank search. "
"Exactly one of --listId or "
"--query must be specified, but not both.",
metavar = "SEARCH_TERM")
# Download options
download = parser.add_argument_group("download-related options")
# --retmax
download.add_argument("-r", "--retmax", type = int, default = 0,
help = "Maximum number of entries to retrieve from "
"GenBank, comprised between 1 and 10000. Use 0 for "
"unlimited number of returned entries. (default: 0)")
# --download
download.add_argument("-d", "--download", action = "store_true",
help = "Download the full GenBank records")
# --forceDownload
download.add_argument("-f", "--forceDownload", action = "store_true",
help = "Download record even if file already exists "
"(implies --download)")
# --fullWGS
download.add_argument("--fullWGS", action = "store_true",
help = "Also download full WGS sequence data when "
"WGS trace reference is present in a GenBank record "
"(only works if the original GenBank record is to be "
"downloaded too or if --forceDownload is used)")
# --outputDir
download.add_argument("-o", "--outputDir", type = str, default = ".",
help = "Destination folder for downloaded records "
"(default: current directory)")
# --batchSize
download.add_argument("-b", "--batchSize", type = int, default = 5,
help = "Batch size for full record retrieval "
"(default: 5)")
# --delay
download.add_argument("--delay", type = int, default = 15,
help = "Delay in seconds between successive batch "
"retrieval of the full records (default: 15)")
return parser
### *** _processArgsToLogic_search(args, stdout, stderr)
def _processArgsToLogic_search(args, stdout, stderr) :
"""Process the command line arguments and determine the action logic for the
:func:`_main_search` function.
Args:
args (namespace): Argument namespace
stdout (file): stdout stream
stderr (file): stderr stream
Returns:
namespace: The argument namespace given input in `args` with added
flags for actions
"""
if args.forceDownload :
args.download = True
# Initiliaze action flags
args.actionFlags = dict()
# --query and --listId
if (args.query is not None) and (args.listId is not None) :
stdout.write("--query and --listId options cannot be specified "
"simultaneously\n"
"Use --help for details on usage.")
sys.exit()
# --query and no --listId
elif (args.query is not None) and (args.listId is None) :
_checkRetmax(args.retmax, stderr)
_checkEmailOption(args, stderr)
args.actionFlags["DoGenbankSearch"] = True
# no --query and --listId
elif (args.query is None) and (args.listId is not None) :
_checkEmailOption(args, stderr)
args.actionFlags["DoGetList"] = True
# no --query and no --listId
else :
assert (args.query is None) and (args.listId is None)
stderr.write("Please specify either --listId or --query\n"
"Use --help for details on usage.\n")
sys.exit()
return args
### *** _checkEmailOption(args, stderr = None)
def _checkEmailOption(args, stderr = None) :
"""Check that an email option was provided and setup Entrez email, produce
a message and exit if not.
Args:
args (namespace): Output from `parser.parse_args()`
stderr (file): Writable stderr stream (if None, use `sys.stderr`)
Returns:
Nothing, but setup Entrez.email or exit the program with a message to
stderr if no email was provided
"""
if stderr is None :
stderr = sys.stderr
if args.email is None :
stderr.write("To make use of NCBI's E-utilities, NCBI requires you to specify\n"
"your email address with each request. In case of excessive\n"
"usage of the E-utilities, NCBI will attempt to contact a user\n"
"at the email address provided before blocking access to the\n"
"E-utilities.")
stderr.write("\nPlease provide your email address using --email")
sys.exit()
Entrez.email = args.email
return
### *** _checkRetmax(retmax, stderr)
def _checkRetmax(retmax, stderr) :
if retmax > 10000 :
stderr.write("Error: --retmax cannot be greater than 10000.\n\n")
stderr.write("Use --retmax 0 for unlimited number of returned results.\n\n"
"You can check how many entries your search term\n"
"returns on the GenBank website\n"
"(http://www.ncbi.nlm.nih.gov/genbank/)\n")
sys.exit()
### ** pygenbank-extract-CDS
# Wishlist
#
# pygenbank-extract-CDS --outfmt orgn,gi,cds,nuc,pos,hash --hash sha512 --summary mySummaries *.gb
# pygenbank-extract-CDS *.gb > mySummaries # default outfmt and --summary
# pygenbank-extract-CDS --unique myUniqueCDS --hash md5sum *.gb > mySummaries # produce fasta
# pygenbank-extract-CDS --unique myUniqueCDS --hash md5sum --outfmt org,gi,cds,nuc,pos,hash *.gb > mySummaries
# pygenbank-extract-CDS --unique myUniqueCDS --hash md5sum --summary mySummaries --outfmt org,gi,cds,nuc,pos,hash *.gb
### *** _main_extract_CDS(args = None, stdout = None, stderr = None,
# gb_record_fmtdict, gb_cds_fmtdict)
def _main_extract_CDS(args = None, stdout = None, stderr = None,
gb_record_fmtdict = None,
gb_cds_fmtdict = None) :
"""Main function, used by the command line script "-extract-CDS". This function
sends the arguments to :func:`_processArgsToLogic_extract_CDS` to determine
which actions must be performed, and then performs the actions.
Args:
args (namespace): Namespace with script arguments, parse the command
line arguments if None
stdout (file): Writable stdout stream (if None, use `sys.stdout`)
stderr (file): Writable stderr stream (if None, use `sys.stderr`)
gb_record_fmtdict (dict): Dictionary mapping outfmt specifiers to
functions to extract the corresponding information from GenBank
records. If None, default is _GB_RECORD_FMTDICT.
gb_cds_fmtdict (dict): Dictionary mapping outfmt specifiers to
functions to extract the corresponding information from GenBank
CDS. If None, default is _GB_CDS_FMTDICT.
Returns:
None
"""
if stdout is None :
stdout = sys.stdout
if stderr is None :
stderr = sys.stderr
if gb_record_fmtdict is None :
gb_record_fmtdict = _GB_RECORD_FMTDICT
if gb_cds_fmtdict is None :
gb_cds_fmtdict = _GB_CDS_FMTDICT
# Process arguments
if args is None :
parser = _makeParser_extract_CDS()
args = parser.parse_args()
args = _processArgsToLogic_extract_CDS(args, stdout, stderr,
gb_record_fmtdict, gb_cds_fmtdict)
# Go through the input files
uniqueSeq = dict()
i_file = 0
for fi in args.genbank_records :
i_file += 1
if args.verbose :
stderr.write(time.asctime() + " - " +
"Processing file " + str(i_file) + " : " +
os.path.basename(fi) + " - " +
"N unique seq : " + str(len(uniqueSeq.keys())) + "\n")
record = SeqIO.parse(fi, "genbank")
for r in record :
if not args.actionFlags.get("DoCount", False) :
(summaryString, uniqueSeq, newSeq) = (
_summarizeRecord(r, args.outfmt, args.hash, uniqueSeq))
stdout.write(summaryString)
else :
count = len([x for x in r.features if x.type == "CDS"])
stdout.write(r.annotations["gi"] + "\t" + str(count) + "\n")
# Write unique sequences
if args.actionFlags.get("DoUniqueSequences", False) :
with open(args.unique, "w") as fo :
for (k, v) in uniqueSeq.items() :
fo.write(">" + k + "\n")
fo.write(v + "\n")
### *** _makeParser_extract_CDS()
def _makeParser_extract_CDS() :
"""Build the argument parser for the main script "-extract-CDS".
returns:
argparse.ArgumentParser() object: An argument parser object ready to be
used to parse the command line arguments
"""
parser = argparse.ArgumentParser(
description = SCRIPT_DESCRIPTION_EXTRACT_CDS)
# input files
parser.add_argument("genbank_records", type = str, nargs = "+",
help = "Filename(s) for GenBank record(s)")
# --hash
parser.add_argument("--hash", metavar = "HASH_ALGORITHM",
choices = ["md5", "sha1", "sha224", "sha256", "sha384",
"sha512"],
default = "md5",
help = "Hash algorithm to use for unique sequence signature "
"(default md5)")
# --unique
parser.add_argument("-u", "--unique", metavar = "UNIQUE_FASTA",
type = str, default = None,
help = "Filename for unique fasta amino acid sequences")
# --outfmt
parser.add_argument("-o", "--outfmt", metavar = "FORMAT",
type = str, default = "dscrp,gi,loc,hash,prot",
help = "Comma-separated list of output column "
"identifiers for summaries. Allowed identifiers are: "
"id, name, dscrp, acc, date, gi, orgn, src (record "