-
Notifications
You must be signed in to change notification settings - Fork 1
/
pyin.py
1380 lines (1024 loc) · 40.6 KB
/
pyin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Like ``sed``, but Python."""
import abc
import argparse
from collections import deque
from collections.abc import Iterable
import builtins
import csv
import functools
import importlib.util
import inspect
import io
import itertools as it
import json
import operator as op
import os
import re
import signal
import sys
import traceback
__version__ = '1.0dev'
__author__ = 'Kevin Wurster'
__license__ = '''
New BSD License
Copyright (c) 2015-2024, Kevin D. Wurster
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* The names of pyin its contributors may not be used to endorse or
promote products derived from this software without specific prior written
permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
_DEFAULT_VARIABLE = 'i'
_DEFAULT_STREAM_VARIABLE = 's'
_EVAL_DIRECTIVE = '%eval'
_IMPORTER_REGEX = re.compile(r"([a-zA-Z_.][a-zA-Z0-9_.]*)")
_DIRECTIVE_REGISTRY = {}
_DEFAULT_SCOPE = {
'__builtins__': builtins,
'it': it,
'op': op,
'reduce': functools.reduce
}
class DirectiveError(RuntimeError):
"""Indicates a directive is invalid."""
def __init__(self, directive):
self.directive = directive
super().__init__(f"invalid directive: {self.directive}")
def _normalize_expressions(f):
"""Ensure functions can receive single or multiple expressions.
A single expression is a string, and multiple expressions is a sequence
of strings. Function's first positional argument must be ``expressions``.
:param callable f:
Decorated function.
:rtype callable:
:return:
Wrapped function.
"""
@functools.wraps(f)
def inner(expressions, *args, **kwargs):
if isinstance(expressions, str):
expressions = (expressions, )
elif not isinstance(expressions, Iterable):
raise TypeError(f"not a sequence: {expressions=}")
return f(tuple(expressions), *args, **kwargs)
return inner
@_normalize_expressions
def compile(
expressions,
variable=_DEFAULT_VARIABLE,
stream_variable=_DEFAULT_STREAM_VARIABLE,
scope=None):
"""Compile expressions to operation classes.
An operation class is a subclass of ``OpBase()``.
:param str or sequence expressions:
One or more expressions to compile.
:param str variable:
Operations should use this variable when inserting an item into
a scope during evaluation.
:param str stream_variable:
Like ``variable`` but when referencing the entire data stream.
:param dict or None scope:
Import into this dictionary.
:rtype sequence:
:return:
A sequence of compiled operations. An operation is a subclass of
``OpBase()``.
"""
compiled = []
# Note that 'scope = scope or {}' is different from 'if scope is None'.
# The latter always creates a new dict if the caller does not pass one,
# and the latter creates a new dict if the caller passes an empty dict.
# The former makes it impossible to update an existing empty scope, while
# the latter does not.
if scope is None:
scope = {}
tokens = list(expressions)
del expressions
while tokens:
# Get a directive
directive = tokens.pop(0)
if directive == '' or directive.isspace():
raise SyntaxError(
f'expression is white space or empty: {repr(directive)}'
)
# If it is not actually a directive just assume it is a Python
# expression that should be evaluated. Stick the token back in the
# queue so that it can be evaluated as an argument - makes the rest
# of the code simpler.
if directive[0] != '%':
tokens.insert(0, directive)
directive = _EVAL_DIRECTIVE
if directive not in _DIRECTIVE_REGISTRY:
raise ValueError(f'invalid directive: {directive}')
cls = _DIRECTIVE_REGISTRY[directive]
# Operation classes define how many arguments are associated with the
# directives they service with annotated positional-only arguments.
# Find them.
sig = inspect.signature(cls.__init__)
pos_only = [
p for p in sig.parameters.values()
if p.kind == p.POSITIONAL_ONLY
]
pos_only = pos_only[1:] # First is 'self'
# Arguments for instantiating argument class
args = [directive]
for param in pos_only[1:]:
# Ran out of CLI arguments but expected more
if not len(tokens):
raise ValueError(
f"missing argument '{param.name}' for directive:"
f" {directive}")
args.append(param.annotation(tokens.pop(0)))
# 'OpBaseExpression()' is special in that it receives scope information
# for Python's builtin 'eval()' and 'exec()' functions, and associated
# variables.
kwargs = {}
if issubclass(cls, OpBaseExpression):
kwargs.update(
variable=variable,
scope=scope,
stream_variable=stream_variable
)
compiled.append(cls(*args, **kwargs))
return tuple(compiled)
@_normalize_expressions
def importer(expressions, scope):
"""Parse expressions and import modules into a single scope.
An expression might be something like ``"os.path.exists(i)"``. This
function parses that expression and imports ``os.path`` into ``scope``.
Expressions are evaluated by Python's eval within this scope.
:param str or sequence expressions:
One or more Python expression.
:param dict scope:
Track imported objects in this scope. Typically, all imports are
written to a single scope.
:rtype dict:
"""
# Find all potential modules to try and import
all_matches = set(it.chain.from_iterable(
re.findall(_IMPORTER_REGEX, expr) for expr in expressions))
for match in all_matches:
# 'match' could be something like:
# json.dumps
# collections.OrderedDict.items
module = match.split('.', 1)[0]
# Try and limit the number of import attempts, but only when confident.
if not module or hasattr(builtins, module):
continue
try:
scope[module] = importlib.import_module(module)
# Failed to import. To be helpful, check and see if the module exists.
# if it does, the caller is referencing something that cannot be
# imported, like a class method. Unclear how to trigger this in a test.
except ImportError: # pragma no cover
res = importlib.util.find_spec(module)
if res is not None:
raise ImportError(
f"attempting to import something that cannot be imported"
f" from a module that does exist: {match}"
) # pragma no cover
return scope
@_normalize_expressions
def eval(
expressions,
stream,
scope=None,
variable=_DEFAULT_VARIABLE,
stream_variable=_DEFAULT_STREAM_VARIABLE
):
"""Evaluate Python expressions across a stream of data.
Expressions are passed through ``importer()`` to construct a scope, and
then evaluated one-by-one across each item in ``stream`` by Python's
``eval()``.
:param str or sequence expressions:
One or more expressions.
:param iterable stream:
Map all ``expressions`` across each item.
:param dict or None scope:
A scope for Python's builtin ``eval()``. This function automatically
imports modules referenced in ``expressions`` into the scope.
:param str variable:
Each item in ``stream`` should be stored in this variable in the
scope.
:param str stream_variable:
Like ``variable`` but for referencing ``stream`` itself.
:return:
An iterator of results.
"""
if scope is None:
scope = {}
# Update with standard baseline
scope.update(_DEFAULT_SCOPE)
# Make the scope discoverable with a bit of introspection. Callers may
# want to find out what is available. This is documented.
scope['_scope'] = scope
importer(expressions, scope=scope)
compiled_expressions = compile(
expressions,
variable=variable,
stream_variable=stream_variable,
scope=scope
)
for op_instance in compiled_expressions:
stream = op_instance(stream)
yield from stream
###############################################################################
# Operations
def _peek(iterable):
"""Peek at the first item of an iterable.
:param iterable iterable:
Get the first item from this iterable.
:return:
A ``tuple`` with two elements. The first is the next value in
``iterable``, and the second is the reconstructed iterable, but
likely as a different type.
"""
iterable = (i for i in iterable)
first = next(iterable)
return first, it.chain([first], iterable)
class OpBase(abc.ABC):
"""Base class for defining an operation.
Subclassers can use positional-only arguments and type annotations in
``__init__`` to define arguments associated with the directive and
their type.
Subclassers are free to reference a variety of attributes on their instance
that contain a variety of information about how they should execute:
directive
The directive currently being executed. Set to ``None`` to disable
registering. This behavior is useful for base classes that extend
``OpBase()``.
directives
A list of all supported directives.
variable
When executing a Python expression, place the item currently being
processed into this environment in the scope for Python's builtin
``eval()``. When only evaluating an expression against an item (and not
the full ``stream`` object), it is good to not use ``stream_variable``.
stream_variable
Like ``variable`` but for the entire ``stream`` object.
scope
Use this as the global scope when exeucting expressions with Python's
builtin ``eval()`` function.
"""
directives = None
def __init__(
self,
directive: str,
# The slash below is significant! Its presence makes the preceding
# args positional-only argument, which we look for elsewhere.
/
):
"""
:param str directive:
The directive actually usd in the expressions. Some operation
classes can support multiple directives.
"""
self.directive = directive
if self.directive not in self.directives:
raise RuntimeError(
f"instantiated '{repr(self)}' with directive"
f" '{self.directive}' but supports:"
f" {' '.join(self.directives)}"
)
def __init_subclass__(cls, /, directives, **kwargs):
"""Register subclass and its directives.
Also populates ``Operations.directives`` class variable.
:param str directives:
Directives supported by this class. Like ``('%upper', '%lower')``.
:param **kwargs kwargs:
Additional arguments.
"""
global _DIRECTIVE_REGISTRY
# First validate subclass
sig = inspect.signature(cls.__init__)
# Positional-only arguments are used to define arguments for a
# directive.
pos_only = [
p for p in sig.parameters.values()
if p.kind == p.POSITIONAL_ONLY
]
pos_only = pos_only[1:] # First is 'self'
if not pos_only:
raise RuntimeError(
f"{cls.__name__}.__init__() is malformed and lacks the"
f" positional-only arguments used for determining directive"
f" arguments"
)
# Positional arguments _must_ be type hinted for casting purposes.
for param in pos_only:
if param.annotation == inspect._empty:
raise RuntimeError(
f"argument '{param.name}' for directive"
f" '{cls.__name__}.__init__()' must have a type annotation"
)
# Register subclasss
super().__init_subclass__(**kwargs)
if directives is not None:
for d in directives:
if d[0] != '%' or d.count('%') != 1:
raise RuntimeError(
f"directive '{d}' for class '{cls.__name__}' is not"
f" prefixed with a single '%'")
elif d in _DIRECTIVE_REGISTRY:
raise RuntimeError(
f"directive '{d}' conflict:"
f" {cls} {_DIRECTIVE_REGISTRY[d]}")
cls.directives = directives
_DIRECTIVE_REGISTRY[d] = cls
def __repr__(self):
"""Approximate representation of operation instance."""
return f"<{self.__class__.__name__}({self.directive}, ...)>"
@abc.abstractmethod
def __call__(self, stream):
"""Process a stream of data.
Implementation must:
1. Treat ``stream`` as an iterable object and be otherwise agnostic
to its type. Iterating directly as a ``for`` loop, or wrapping
as a generator via ``(i for i in stream)`` are both appropriate.
2. Consume all items in ``stream``.
3. Be a generator or return an iterator.
4. Be prepared for the input ``stream`` to not contain any data.
An implementation should also be conscious of function call overhead.
``pyin`` primarily seeks to be friendly and convenient, but fast is
also nice.
:param stream:
Input data stream. An iterable object.
:return:
An iterable object.
"""
raise NotImplementedError # pragma no cover
class OpBaseExpression(OpBase, directives=None):
"""Base class for operations evaluating an expression.
Typically by ``eval()`` or ``exec()``.
"""
def __init__(
self,
directive: str,
expression: str,
/,
variable,
stream_variable,
scope
):
"""
:param str directive:
See parent implementation.
:param str variable:
Operations executing expressions with Python's ``eval()`` should
place data in this variable in the scope.
:param str stream_variable:
Like ``variable`` but for referencing the full stream of data.
:param dict scope:
Operations executing expressions with Python's ``eval(0)`` should
use this global scope.
"""
super().__init__(directive)
self.expression = expression
self.variable = variable
self.stream_variable = stream_variable
self.scope = scope
def compiled_expression(self, mode):
"""Compile a Python expression using the builtin ``compile()``."""
return builtins.compile(self.expression, '<string>', mode)
class OpEval(OpBaseExpression, directives=('%eval', '%stream', '%exec')):
"""Evaluate a Python expression with Python's ``eval()``.
This operation receives special treatment in ``compile()``, but its
subclassers do not. When parsing the input expressions, anything not
associated with a directive is assumed to be a generic Python expression
that should be handled by this class.
In code terms, this:
.. code:: python
>>> import pyin
>>> list(pyin.eval('i + 1', range(3)))
[1, 2, 3]
is equivalent to:
.. code:: python
>>> import pyin
>>> list(pyin.eval(['%eval', 'i + 1'], range(3)))
[1, 2, 3]
"""
def __call__(self, stream):
# Compile the expression before doing anything else. If 'stream' is
# empty then some of the code below doesn't execute. Unfortunately
# this can only happen at runtime since this operation handles both
# 'exec()' and 'eval()'.
if self.directive == '%exec':
mode = 'exec'
else:
mode = 'eval'
compiled_expression = self.compiled_expression(mode)
if self.directive == '%stream':
# This method can receive any object, but convert it to an iterator
# to provide consistency before passing to the expression.
stream = (i for i in stream)
yield from builtins.eval(
compiled_expression,
self.scope,
{self.stream_variable: stream}
)
elif self.directive == '%eval':
for item in stream:
yield builtins.eval(
compiled_expression,
self.scope,
{self.variable: item}
)
elif self.directive == '%exec':
# Unlike 'eval()', 'exec()' executes statements, meaning that it
# updates the scope in place. The current item must be extracted
# from the scope after calling 'exec()'. BUT! It is possible for
# 'exec()' to delete the variable, so we cannot assume it
# exists in the local scope later. Possibly supporting this
# behavior is bad, and we should instead produce an error if this
# happens.
local_scope = {}
for item in stream:
local_scope[self.variable] = item
builtins.exec(
compiled_expression,
self.scope,
local_scope
)
# It is possible to 'del variable'!
if self.variable in local_scope:
yield local_scope[self.variable]
else: # pragma no cover
raise DirectiveError(self.directive)
class OpEvalIf(OpBaseExpression, directives=('%evalif', '%execif')):
"""Like ``OpEval()``, but for optionally executing an expression.
Does not filter. If the sentinel expression evaluates as ``False``, the
item is emitted without evaluating the expression.
"""
def __init__(
self,
directive: str,
sentinel_expression: str,
expression: str,
/,
variable,
stream_variable,
scope
):
"""See base class for most parameters.
:param str sentinel_expression:
Determines if ``expression`` should be evaluated.
"""
super().__init__(
directive,
expression,
variable=variable,
stream_variable=stream_variable,
scope=scope
)
self.sentinel_expression = sentinel_expression
def __call__(self, stream):
selection, stream = it.tee(stream, 2)
selector = OpEval(
'%eval',
self.sentinel_expression,
variable=self.variable,
stream_variable=self.stream_variable,
scope=self.scope
)
evaluator = OpEval(
self.directive[:-2],
self.expression,
variable=self.variable,
stream_variable=self.stream_variable,
scope=self.scope
)
selection = selector(selection)
evaluated = evaluator(stream)
for sentinel in selection:
if sentinel:
yield next(evaluated)
else:
yield next(stream)
# Ensure both iterators were fully exhausted. If not, something is
# wrong.
hint_data = {
'selection': selection,
'evaluated': evaluated
}
for hint, data in hint_data.items():
try:
next(data)
raise RuntimeError(f'failed to exhaust: {hint}') # pragma no cover
except StopIteration:
pass
class OpFilter(OpBaseExpression, directives=('%filter', '%filterfalse')):
"""Filter data based on a Python expression.
These are equivalent:
%filter "i > 2"
%filterfalse "i <= 2"
"""
def __call__(self, stream):
# Can't just use 'filter()' and 'it.filterfalse()' directly since we
# have to evaluate a Python expression somewhere. Instead, fork the
# stream and use one copy for evaluating expressions, and one copy
# for values to emit.
is_none = self.expression.lower() == 'none'
# Just use 'filter()'. Hard to express as interactions with the
# parent 'OpEval()' class.
if is_none and self.directive == '%filter':
return filter(None, stream)
# Just use 'itertools.filterfalse()'. Hard to express as interactions
# with the parent 'OpEval()' class.
elif is_none and self.directive == '%filterfalse':
return it.filterfalse(None, stream)
# Implement via 'itertools.compress()'. Equivalent to:
# filter(lambda i: <expression>, stream)
# which is extremely hard to structure. Instead, just rely on Python's
# 'truthy' checks.
elif self.directive in ('%filter', '%filterfalse'):
stream, selection = it.tee(stream, 2)
selection = (
builtins.eval(
self.compiled_expression('eval'),
self.scope,
{self.variable: item}
)
for item in selection
)
if self.directive == '%filterfalse':
selection = (not s for s in selection)
return it.compress(stream, selection)
else: # pragma no cover
raise DirectiveError(self.directive)
class OpAccumulate(OpBase, directives=('%accumulate', )):
"""Accumulate the entire stream into a single object."""
def __call__(self, stream):
# At first glance the simplest implemenation is:
# yield list(stream)
# however, if 'stream' is empty this is equivalent to:
# yield []
# which converts the contents of 'stream' into a single empty list.
stream = list(stream)
if stream:
yield stream
class OpChain(OpBase, directives=('%chain', )):
"""Flatten the stream by one level – like ``itertools.chain()``."""
def __call__(self, stream):
return it.chain.from_iterable(stream)
class OpJSON(OpBase, directives=('%json', )):
"""Serialize/deserialize JSON data.
If the input is a string it is assumed to be JSON and deserialized.
Otherwise, it is serialized.
"""
def __call__(self, stream):
try:
first, stream = _peek(stream)
except StopIteration:
return []
# 'json.loads/dumps()' both use these objects internally, but create
# an instance with every call. Presumably this is faster.
if isinstance(first, str):
func = json.JSONDecoder().decode
else:
func = json.JSONEncoder().encode
return map(func, stream)
class OpCSVDict(OpBase, directives=('%csvd', )):
"""Read/write data via ``csv.DictReader()`` and ``csv.DictWriter()``.
If the input data is text data is parsed with the default
``csv.DictReader()`` settings. Otherwise, a header and rows with "quote
all" enabled are written.
"""
def __call__(self, stream):
try:
first, stream = _peek(stream)
except StopIteration:
return
# Reading from a CSV
if isinstance(first, str):
yield from csv.DictReader(stream)
# Writing to a CSV
else:
# This file-like object doesn't actually write to a file. Since
# 'csv.DictWriter.write()' just returns values up the chain, just
# returning from 'FakeFile.write()' is enough to get a line of
# text to pass down the line.
class FakeFile:
def write(self, data):
return data
writer = csv.DictWriter(
FakeFile(),
fieldnames=list(first.keys()),
quoting=csv.QUOTE_ALL,
lineterminator='', # pyin itself handles newline characters
)
yield writer.writeheader()
for row in stream:
yield writer.writerow(row)
class OpReversed(OpBase, directives=('%rev', '%revstream')):
"""Reverse item/stream."""
def __call__(self, stream):
# Python's 'reversed()' is kind of weird, and seems to only work well
# when the object is immediately iterated over. So, to be more helpful,
# we have some very extra special handling here.
# Reverse each item
if self.directive in ('%rev', '%reversed'):
try:
first, stream = _peek(stream)
except StopIteration:
return
# Can reverse these objects by slicing while preserving the
# original type.
if isinstance(first, (str, list, tuple)):
yield from (i[::-1] for i in stream)
else:
yield from (tuple(reversed(i)) for i in stream)
# Reverse entire stream
elif self.directive in ('%revstream', '%reversedstream'):
# Popping items off of the queue avoids having two copies of the
# input data in-memory.
stream = deque(stream)
while stream:
yield stream.pop()
else: # pragma no cover
raise DirectiveError(self.directive)
class OpBatched(OpBase, directives=('%batched', )):
"""Group stream into chunks with no more than N elements.
Equivalent to ``itertools.batched()``.
"""
def __init__(self, directive: str, chunksize: int, /, **kwargs):
"""
:param str directive:
See parent implementation.
:param int chunksize:
Maximum number of items to include in each "batch".
:param **kwargs kwargs:
See parent implementation.
"""
super().__init__(directive, **kwargs)
self.chunksize = chunksize
def __call__(self, stream):
# 'itertools.batched()' was introduced in Python 3.12 and cannot
# be used
stream = (i for i in stream)
while chunk := tuple(it.islice(stream, self.chunksize)):
yield tuple(chunk)
class OpStrNoArgs(OpBase, directives=(
'%split', '%lower', '%upper', '%strip', '%lstrip', '%rstrip')):
"""Text processing that doesn't require an argument.
Implements several directives mapping directly to ``str`` methods.
"""
def __call__(self, stream):
return map(op.methodcaller(self.directive[1:]), stream)
class OpStrOneArg(OpBase, directives=(
'%join', '%splits',
'%partition', '%rpartition',
'%strips', '%lstrips', '%rstrips')):
# Possibly differentiating between things like '%strip' and '%strips' is
# too much, and instead we should just have '%strip string'?
"""Like ``OpStrNoArgs()`` but for methods requiring one argument.
Directives map directly to ``str`` methods. Note that some of these
directives are very similar to those implemented by ``OpStrNoArgs()``,
but without a default value.
"""
def __init__(self, directive: str, argument: str, /, **kwargs):
"""
:param str directive:
Working with this directive.
:param str argument:
For ``str`` method.
:param **kwargs kwargs:
For parent implementation.
"""
super().__init__(directive, **kwargs)
self.argument = argument
def __call__(self, stream):
mapping = {
'%strips': 'strip',
'%lstrips': 'lstrip',
'%rstrips': 'rstrip',
'%splits': 'split',
'%lsplits': 'lsplit',
'%rsplits': 'rsplit',
}
method_name = mapping.get(self.directive, self.directive[1:])
if method_name == 'join':
return map(self.argument.join, stream)
else:
func = op.methodcaller(method_name, self.argument)
return map(func, stream)
class OpReplace(OpBase, directives=('%replace', )):
"""Replace a portion of a string with a new string."""
def __init__(self, directive: str, old: str, new: str, /, **kwargs):
"""
:param str directive:
Currently active directive.
:param str old:
Replace all occurrences of this substring with ``new``.
:param str new:
See ``old``.
:param **kwargs kwargs:
See parent implementation.
"""
super().__init__(directive, **kwargs)
self.old = old
self.new = new
def __call__(self, stream):
return map(op.methodcaller('replace', self.old, self.new), stream)