data limit to sockets? data events stop firing!
I have a pretty elaborate class that extends Socket and performs some serialization and data protocol stuff. It works pretty well when I send small messages. However, when I send anything over about 132KB at once, the SOCKET_DATA progress events stop firing -- or at least the function I've got listening for them does. My class is called RPCSocket and I assign the socketDataHandler function like this in my constructor:
addEventListener(ProgressEvent.SOCKET_DATA, socketDataHandler);
My socketDataHandler function (with lots of comments) is attached. Is there some limit to how much data can be sent over a socket at once? Is there something wrong with my code? Here's the trace output:
Code:
socketdatahandler
bytesAvail:4380
buffer length:0
reading bytes into buffer for concat
buffer length after concat:4380
buffer state:0
BUFFER state is ready
and we got enough bytes: 4380
setting mesg length to 500040
buffer state set to 1
trimming 4 bytes off buffer
buffer length:4376
the big check
end of message peel loop
socket data handler end
socketdatahandler
bytesAvail:8760
buffer length:4376
reading bytes into buffer for concat
buffer length after concat:13136
buffer state:1
the big check
end of message peel loop
socket data handler end
socketdatahandler
bytesAvail:64240
buffer length:13136
reading bytes into buffer for concat
buffer length after concat:77376
buffer state:1
the big check
end of message peel loop
socket data handler end
socketdatahandler
bytesAvail:54020
buffer length:77376
reading bytes into buffer for concat
buffer length after concat:131396
buffer state:1
the big check
end of message peel loop
socket data handler end
PHP Code:
private function socketDataHandler(evt:ProgressEvent):void {
trace('socketdatahandler');
var tmpBuf:ByteArray;
var messageBytes:ByteArray;
log.write('RPCSocket.socketDataHandler invoked:' + evt, Log.LOW);
var bytesAvail:int = super.bytesAvailable;
trace('bytesAvail:' + bytesAvail);
trace('buffer length:' + buffer.length);
var totalLength:int = buffer.length + bytesAvail;
if (totalLength > RPCSocket.MAX_BUFFER_LENGTH) {
// read all the socket data and dump it
tmpBuf = new ByteArray();
trace('reading all bytes to empty buffer');
super.readBytes(tmpBuf, 0, bytesAvail);
tmpBuf = null;
resetBuffer();
log.write('RPCSocket.socketDataHandler failed. Buffer length of ' +
totalLength + ' exceeds maximum allowed of ' + RPCSocket.MAX_BUFFER_LENGTH +
' bytes. Buffer reset.', Log.HIGH);
dispatchEvent(new IOErrorEvent(IOErrorEvent.IO_ERROR, false, false,
'RPCSocket.socketDataHandler failed. Maximum buffer length of ' + totalLength + ' exceeds
maximum allowed of ' + RPCSocket.MAX_BUFFER_LENGTH + ' bytes. Buffer reset.'));
return;
}
// otherwise, add all the socket's available data to the buffer
trace('reading bytes into buffer for concat');
super.readBytes(buffer, buffer.length);
trace('buffer length after concat:' + buffer.length);
// try to peel off any messages that might live on the buffer
do {
trace('buffer state:' + bufferState);
switch(bufferState) {
case RPCSocket.BUFFER_STATE_READY :
trace('BUFFER state is ready');
if (buffer.length < RPCSocket.MESSAGE_LENGTH_INDICATOR_BYTES) {
return; // not enough to do anything yet
}
trace("\tand we got enough bytes: " + buffer.length);
// otherwise, we can at least read the incoming message length
incomingMessageLength = buffer.readUnsignedInt();
trace('setting mesg length to ' + incomingMessageLength);
// sanity check
if (incomingMessageLength == 0) {
trace('DAMN have to reset buffer');
resetBuffer();
log.write('Incoming message length of zero. Flushing buffer.', Log.HIGH);
return;
}
bufferState = RPCSocket.BUFFER_STATE_READING;
trace('buffer state set to ' + RPCSocket.BUFFER_STATE_READING);
// NOTE - at this point, the buffer's position has advanced
// let's trim those few bytes off the beginning
trace('trimming 4 bytes off buffer');
tmpBuf = new ByteArray();
buffer.readBytes(tmpBuf, 0);
buffer = new ByteArray();
tmpBuf.readBytes(buffer, 0);
tmpBuf = null;
trace('buffer length:' + buffer.length);
break;
case RPCSocket.BUFFER_STATE_READING :
// nothing to do here...might be some day
break;
default:
resetBuffer();
log.write('Unexpected buffer state. Flushing buffer.', Log.HIGH);
return;
} // switch bufferState
if (bufferState == RPCSocket.BUFFER_STATE_READY) {
trace('buffer state is ready, returning');
return;
}
trace('the big check');
if ((bufferState == RPCSocket.BUFFER_STATE_READING) && (buffer.length >= incomingMessageLength)) {
trace('we in now');
// we have our message! peel it off
// read the message's bytes into a new array
messageBytes = new ByteArray();
trace('reading bytes into MessageBytes');
buffer.readBytes(messageBytes, 0, incomingMessageLength);
// truncate the buffer
trace('truncating buffer after message bytes out');
tmpBuf = new ByteArray();
buffer.readBytes(tmpBuf, 0);
buffer = new ByteArray();
tmpBuf.readBytes(buffer, 0);
trace('buffer length after trunc:' + buffer.length);
// reset the buffer state
bufferState = RPCSocket.BUFFER_STATE_READY;
trace('bufferstate set to ' + bufferState);
incomingMessageLength = 0;
// unserialize the message and execute the RPC therein
var incomingRPC:Array = unserialize(messageBytes);
if (!incomingRPC || (!(incomingRPC is Array))) {
log.write('RPCSocket.socketDataHandler failed. Data received
did not unserialize properly to an array.', Log.HIGH);
}
var serviceName:String = incomingRPC[0];
var eventName:String = incomingRPC[1];
if (!serviceName) {
// no service! dispatchEvent to all registered services
log.write('RPCSocket.socketDataHandler failed. RPC received
from server with no serviceName.', Log.HIGH);
dispatchEvent(new IOErrorEvent(IOErrorEvent.IO_ERROR,
false, false, 'RPCSocket.socketDataHandler failed. RPC received with no serviceName.'));
return;
}
var svc:FlashMOGService = services[serviceName];
if (!svc) {
// service doesn't exist!
log.write('RPCSocket.socketDataHandler failed. No service
named ' + serviceName + '.', Log.HIGH);
dispatchEvent(new IOErrorEvent(IOErrorEvent.IO_ERROR,
false, false, 'RPCSocket.socketDataHandler failed. No service named ' + serviceName + '.'));
return;
}
if (!eventName) {
// we have a serviceName but no event name!
// broadcast error to that one service only
log.write('RPCSocket.socketDataHandler failed. RPC received
with no eventName.', Log.HIGH);
svc.dispatchEvent(new IOErrorEvent(IOErrorEvent.IO_ERROR,
false, false, 'RPCSocket.socketDataHandler failed. RPC received with no eventName.'));
return;
}
var args:Array = incomingRPC[2];
svc.dispatchEvent(new FlashMOGDataEvent(FlashMOGDataEvent.DATA, false, false, args));
var methodFunc:Function = services[serviceName].client[eventName];
if (methodFunc is Function) {
methodFunc.apply(null, args);
}
} // if bufferState is READ and buffer.length >= incomingMessageLength
trace ('end of message peel loop');
} while ((bufferState == RPCSocket.BUFFER_STATE_READY) &&
(buffer.length > RPCSocket.MESSAGE_LENGTH_INDICATOR_BYTES));
log.write('RPCSocket.socketDataHandler end reached:' + evt, Log.LOW);
trace('socket data handler end');
}// function socketDataHandler()