-
Notifications
You must be signed in to change notification settings - Fork 0
/
Reader.m
executable file
·124 lines (107 loc) · 3.67 KB
/
Reader.m
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
//
// Created by chris on 6/17/13.
//
#import "Reader.h"
#import "NSData+EnumerateComponents.h"
@interface Reader () <NSStreamDelegate>
@property (nonatomic, strong) NSInputStream* inputStream;
@property (nonatomic, strong) NSURL *fileURL;
@property (nonatomic, copy) NSData *delimiter;
@property (nonatomic, strong) NSMutableData *remainder;
@property (nonatomic, copy) void (^callback) (NSUInteger lineNumber, NSString* line);
@property (nonatomic, copy) void (^completion) (NSUInteger numberOfLines);
@property (nonatomic) NSUInteger lineNumber;
@property (nonatomic, strong) NSOperationQueue *queue;
@end
@implementation Reader
- (void)enumerateLinesWithBlock:(void (^)(NSUInteger lineNumber, NSString *line))block completionHandler:(void (^)(NSUInteger numberOfLines))completion;
{
if (self.queue == nil) {
self.queue = [[NSOperationQueue alloc] init];
self.queue.maxConcurrentOperationCount = 1;
}
NSAssert(self.queue.maxConcurrentOperationCount == 1, @"Queue can't be concurrent.");
NSAssert(self.inputStream == nil, @"Cannot process multiple input streams in parallel");
self.callback = block;
self.completion = completion;
self.inputStream = [NSInputStream inputStreamWithURL:self.fileURL];
self.inputStream.delegate = self;
[self.inputStream scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
[self.inputStream open];
}
- (id)initWithFileAtURL:(NSURL *)fileURL;
{
if (![fileURL isFileURL]) {
return nil;
}
self = [super init];
if (self) {
self.fileURL = fileURL;
self.delimiter = [@"\n" dataUsingEncoding:NSUTF8StringEncoding];
}
return self;
}
- (void)stream:(NSStream*)stream handleEvent:(NSStreamEvent)eventCode
{
switch (eventCode) {
case NSStreamEventOpenCompleted: {
break;
}
case NSStreamEventEndEncountered: {
[self emitLineWithData:self.remainder];
self.remainder = nil;
[self.inputStream close];
self.inputStream = nil;
[self.queue addOperationWithBlock:^{
self.completion(self.lineNumber + 1);
}];
break;
}
case NSStreamEventErrorOccurred: {
NSLog(@"error"); // TODO
break;
}
case NSStreamEventHasBytesAvailable: {
NSMutableData *buffer = [NSMutableData dataWithLength:4 * 1024];
NSUInteger length = (NSUInteger) [self.inputStream read:[buffer mutableBytes] maxLength:[buffer length]];
if (0 < length) {
[buffer setLength:length];
__weak id weakSelf = self;
[self.queue addOperationWithBlock:^{
[weakSelf processDataChunk:buffer];
}];
}
break;
}
default: {
break;
}
}
}
- (void)processDataChunk:(NSMutableData *)buffer;
{
if (self.remainder != nil) {
[self.remainder appendData:buffer];
} else {
self.remainder = buffer;
}
[self.remainder obj_enumerateComponentsSeparatedBy:self.delimiter usingBlock:^(NSData* component, BOOL last){
if (!last) {
[self emitLineWithData:component];
} else if (0 < [component length]) {
self.remainder = [component mutableCopy];
} else {
self.remainder = nil;
}
}];
}
- (void)emitLineWithData:(NSData *)data;
{
NSUInteger lineNumber = self.lineNumber;
self.lineNumber = lineNumber + 1;
if (0 < data.length) {
NSString *line = [[NSString alloc] initWithData:data encoding:NSUTF8StringEncoding];
self.callback(lineNumber, line);
}
}
@end