Fix crash for web socket in some race conditions (#22439)

Summary:
Fixes #21086.
Fixes #6117.

This PR fixes a crash caused by a race condition when `webSocket` deallocated and `NSStream` delegate callback, because `NSStream`'s delegate callback be called on `RCTSR_networkRunLoop`.

This PR mainly changes:

* Remove unnecessary `nil` operation in `dealloc` method.
* Add a new method `_scheduleCleanUp` to schedule `webSocket` cleanup also on `RCTSR_networkRunLoop`.
* In `stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode` delegate method, add a `wself` to make safe further.
Pull Request resolved: https://github.com/facebook/react-native/pull/22439

Differential Revision: D13564247

Pulled By: cpojer

fbshipit-source-id: 675c1b2805aa45c54d7708d796f5843ef7ea34e2
This commit is contained in:
zhongwuzw
2019-01-01 17:52:42 -08:00
committed by Facebook Github Bot
parent 2c3f807ace
commit dd209bb789

View File

@@ -217,6 +217,8 @@ typedef void (^data_callback)(RCTSRWebSocket *webSocket, NSData *data);
int _closeCode;
BOOL _isPumping;
BOOL _cleanupScheduled;
NSMutableSet<NSArray *> *_scheduledRunloops;
@@ -324,17 +326,11 @@ RCT_NOT_IMPLEMENTED(- (instancetype)init)
[_inputStream close];
[_outputStream close];
_workQueue = NULL;
if (_receivedHTTPHeaders) {
CFRelease(_receivedHTTPHeaders);
_receivedHTTPHeaders = NULL;
}
if (_delegateDispatchQueue) {
_delegateDispatchQueue = NULL;
}
}
#ifndef NDEBUG
@@ -626,11 +622,11 @@ RCT_NOT_IMPLEMENTED(- (instancetype)init)
}];
self.readyState = RCTSR_CLOSED;
self->_selfRetain = nil;
RCTSRLog(@"Failing with error %@", error.localizedDescription);
[self _disconnect];
[self _scheduleCleanup];
}
});
}
@@ -1036,12 +1032,7 @@ static const uint8_t RCTSRPayloadLenMask = 0x7F;
!_sentClose) {
_sentClose = YES;
[_outputStream close];
[_inputStream close];
for (NSArray *runLoop in [_scheduledRunloops copy]) {
[self unscheduleFromRunLoop:runLoop[0] forMode:runLoop[1]];
}
[self _scheduleCleanup];
if (!_failed) {
[self _performDelegateBlock:^{
@@ -1050,8 +1041,6 @@ static const uint8_t RCTSRPayloadLenMask = 0x7F;
}
}];
}
_selfRetain = nil;
}
}
@@ -1345,94 +1334,142 @@ static const size_t RCTSRFrameHeaderOverhead = 32;
}
}
}
assert(_workQueue != NULL);
// _workQueue cannot be NULL
if (!_workQueue) {
return;
}
__weak typeof(self) weakSelf = self;
dispatch_async(_workQueue, ^{
switch (eventCode) {
case NSStreamEventOpenCompleted: {
RCTSRLog(@"NSStreamEventOpenCompleted %@", aStream);
if (self.readyState >= RCTSR_CLOSING) {
return;
}
assert(self->_readBuffer);
if (self.readyState == RCTSR_CONNECTING && aStream == self->_inputStream) {
[self didConnect];
}
[self _pumpWriting];
[self _pumpScanner];
break;
}
case NSStreamEventErrorOccurred: {
RCTSRLog(@"NSStreamEventErrorOccurred %@ %@", aStream, [aStream.streamError copy]);
// TODO: specify error better!
[self _failWithError:aStream.streamError];
self->_readBufferOffset = 0;
self->_readBuffer.length = 0;
break;
}
case NSStreamEventEndEncountered: {
[self _pumpScanner];
RCTSRLog(@"NSStreamEventEndEncountered %@", aStream);
if (aStream.streamError) {
[self _failWithError:aStream.streamError];
} else {
dispatch_async(self->_workQueue, ^{
if (self.readyState != RCTSR_CLOSED) {
self.readyState = RCTSR_CLOSED;
self->_selfRetain = nil;
}
if (!self->_sentClose && !self->_failed) {
self->_sentClose = YES;
// If we get closed in this state it's probably not clean because we should be sending this when we send messages
[self _performDelegateBlock:^{
if ([self.delegate respondsToSelector:@selector(webSocket:didCloseWithCode:reason:wasClean:)]) {
[self.delegate webSocket:self didCloseWithCode:RCTSRStatusCodeGoingAway reason:@"Stream end encountered" wasClean:NO];
}
}];
}
});
}
break;
}
case NSStreamEventHasBytesAvailable: {
RCTSRLog(@"NSStreamEventHasBytesAvailable %@", aStream);
const int bufferSize = 2048;
uint8_t buffer[bufferSize];
while (self->_inputStream.hasBytesAvailable) {
NSInteger bytes_read = [self->_inputStream read:buffer maxLength:bufferSize];
if (bytes_read > 0) {
[self->_readBuffer appendBytes:buffer length:bytes_read];
} else if (bytes_read < 0) {
[self _failWithError:self->_inputStream.streamError];
}
if (bytes_read != bufferSize) {
break;
}
};
[self _pumpScanner];
break;
}
case NSStreamEventHasSpaceAvailable: {
RCTSRLog(@"NSStreamEventHasSpaceAvailable %@", aStream);
[self _pumpWriting];
break;
}
default:
RCTSRLog(@"(default) %@", aStream);
break;
typeof(self) strongSelf = weakSelf;
if (!strongSelf) {
return;
}
[strongSelf safeHandleEvent:eventCode stream:aStream];
});
}
- (void)safeHandleEvent:(NSStreamEvent)eventCode stream:(NSStream *)aStream
{
switch (eventCode) {
case NSStreamEventOpenCompleted: {
RCTSRLog(@"NSStreamEventOpenCompleted %@", aStream);
if (self.readyState >= RCTSR_CLOSING) {
return;
}
assert(self->_readBuffer);
if (self.readyState == RCTSR_CONNECTING && aStream == self->_inputStream) {
[self didConnect];
}
[self _pumpWriting];
[self _pumpScanner];
break;
}
case NSStreamEventErrorOccurred: {
RCTSRLog(@"NSStreamEventErrorOccurred %@ %@", aStream, [aStream.streamError copy]);
// TODO: specify error better!
[self _failWithError:aStream.streamError];
self->_readBufferOffset = 0;
self->_readBuffer.length = 0;
break;
}
case NSStreamEventEndEncountered: {
[self _pumpScanner];
RCTSRLog(@"NSStreamEventEndEncountered %@", aStream);
if (aStream.streamError) {
[self _failWithError:aStream.streamError];
} else {
dispatch_async(self->_workQueue, ^{
if (self.readyState != RCTSR_CLOSED) {
self.readyState = RCTSR_CLOSED;
[self _scheduleCleanup];
}
if (!self->_sentClose && !self->_failed) {
self->_sentClose = YES;
// If we get closed in this state it's probably not clean because we should be sending this when we send messages
[self _performDelegateBlock:^{
if ([self.delegate respondsToSelector:@selector(webSocket:didCloseWithCode:reason:wasClean:)]) {
[self.delegate webSocket:self didCloseWithCode:RCTSRStatusCodeGoingAway reason:@"Stream end encountered" wasClean:NO];
}
}];
}
});
}
break;
}
case NSStreamEventHasBytesAvailable: {
RCTSRLog(@"NSStreamEventHasBytesAvailable %@", aStream);
const int bufferSize = 2048;
uint8_t buffer[bufferSize];
while (self->_inputStream.hasBytesAvailable) {
NSInteger bytes_read = [self->_inputStream read:buffer maxLength:bufferSize];
if (bytes_read > 0) {
[self->_readBuffer appendBytes:buffer length:bytes_read];
} else if (bytes_read < 0) {
[self _failWithError:self->_inputStream.streamError];
}
if (bytes_read != bufferSize) {
break;
}
};
[self _pumpScanner];
break;
}
case NSStreamEventHasSpaceAvailable: {
RCTSRLog(@"NSStreamEventHasSpaceAvailable %@", aStream);
[self _pumpWriting];
break;
}
default:
RCTSRLog(@"(default) %@", aStream);
break;
}
}
- (void)_scheduleCleanup
{
if (_cleanupScheduled) {
return;
}
_cleanupScheduled = YES;
// Cleanup NSStream's delegate in the same RunLoop used by the streams themselves:
// This way we'll prevent race conditions between handleEvent and SRWebsocket's dealloc
NSTimer *timer = [NSTimer timerWithTimeInterval:(0.0f) target:self selector:@selector(_cleanupSelfReference:) userInfo:nil repeats:NO];
[[NSRunLoop RCTSR_networkRunLoop] addTimer:timer forMode:NSDefaultRunLoopMode];
}
- (void)_cleanupSelfReference:(NSTimer *)timer
{
// Remove the streams, right now, from the networkRunLoop
[_inputStream close];
[_outputStream close];
// Unschedule from RunLoop
for (NSArray *runLoop in [_scheduledRunloops copy]) {
[self unscheduleFromRunLoop:runLoop[0] forMode:runLoop[1]];
}
// Nuke NSStream's delegate
_inputStream.delegate = nil;
_outputStream.delegate = nil;
// Cleanup selfRetain in the same GCD queue as usual
dispatch_async(_workQueue, ^{
self->_selfRetain = nil;
});
}