Path: blob/trunk/third_party/closure/goog/net/streams/pbstreamparser.js
1865 views
// Copyright 2015 The Closure Library Authors. All Rights Reserved.1//2// Licensed under the Apache License, Version 2.0 (the "License");3// you may not use this file except in compliance with the License.4// You may obtain a copy of the License at5//6// http://www.apache.org/licenses/LICENSE-2.07//8// Unless required by applicable law or agreed to in writing, software9// distributed under the License is distributed on an "AS-IS" BASIS,10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.11// See the License for the specific language governing permissions and12// limitations under the License.1314/**15* @fileoverview The default Protobuf stream parser.16*17* The default Protobuf parser decodes the input stream (binary) under the18* following rules:19* 1. The data stream as a whole represents a valid proto message,20* defined as following:21*22* message StreamBody {23* repeated bytes messages = 1;24* google.rpc.Status status = 2;25* repeated bytes padding = 15;26* }27*28* Padding are noop messages may be generated as base64 padding (for29* browsers) or as a way to keep the connection alive. Its tag-id is30* reserved as the maximum value allowed for a single-byte tag-id.31*32* 2. The only things that are significant to this parser in the above33* definition are the specification of the tag ids and wire types (all fields34* having length-delimited wire type). The parser doesn't fail if status35* appears more than once, i.e. the validity of StreamBody (other than tag36* ids and wire types) is not checked.37*38* 3. The wire format looks like:39*40* (<tag-id> <wire-type> <length> <message-bytes>)... EOF41*42* For details of Protobuf wire format see43* https://developers.google.com/protocol-buffers/docs/encoding44*45* A message with unknown tag or with length larger than 2^32 - 1 will46* invalidate the whole stream.47*48* 4. All decoded messages and status in the buffer will be delivered in49* a batch (array), with each constructed as {tag-id: opaque-byte-array}.50* No-op data, e.g. padding, will be immediately discarded.51*52* 5. If a high-level API does not support batch delivery (e.g. grpc), then53* a wrapper is expected to deliver individual message separately in order.54*/5556goog.provide('goog.net.streams.PbStreamParser');5758goog.require('goog.asserts');59goog.require('goog.net.streams.StreamParser');6061goog.scope(function() {626364/**65* The default Protobuf stream parser.66*67* @constructor68* @struct69* @implements {goog.net.streams.StreamParser}70* @final71*/72goog.net.streams.PbStreamParser = function() {73/**74* The current error message, if any.75* @private {?string}76*/77this.errorMessage_ = null;7879/**80* The currently buffered result (parsed messages).81* @private {!Array<!Object>}82*/83this.result_ = [];8485/**86* The current position in the streamed data.87* @private {number}88*/89this.streamPos_ = 0;9091/**92* The current parser state.93* @private {goog.net.streams.PbStreamParser.State_}94*/95this.state_ = Parser.State_.INIT;9697/**98* The tag of the proto message being parsed.99* @private {number}100*/101this.tag_ = 0;102103/**104* The length of the proto message being parsed.105* @private {number}106*/107this.length_ = 0;108109/**110* Count of processed length bytes.111* @private {number}112*/113this.countLengthBytes_ = 0;114115/**116* Raw bytes of the current message. Uses Uint8Array by default. Falls back to117* native array when Uint8Array is unsupported.118* @private {?Uint8Array|?Array<number>}119*/120this.messageBuffer_ = null;121122/**123* Count of processed message bytes.124* @private {number}125*/126this.countMessageBytes_ = 0;127};128129130var Parser = goog.net.streams.PbStreamParser;131132133/**134* The parser state.135* @private @enum {number}136*/137Parser.State_ = {138INIT: 0, // expecting the tag:wire-type byte139LENGTH: 1, // expecting more varint bytes of length140MESSAGE: 2, // expecting more message bytes141INVALID: 3142};143144145/**146* Tag of padding messages.147* @private @const {number}148*/149Parser.PADDING_TAG_ = 15;150151152/**153* @override154*/155goog.net.streams.PbStreamParser.prototype.isInputValid = function() {156return this.state_ != Parser.State_.INVALID;157};158159160/**161* @override162*/163goog.net.streams.PbStreamParser.prototype.getErrorMessage = function() {164return this.errorMessage_;165};166167168/**169* @param {!Uint8Array|!Array<number>} inputBytes The current input buffer170* @param {number} pos The position in the current input that triggers the error171* @param {string} errorMsg Additional error message172* @throws {!Error} Throws an error indicating where the stream is broken173* @private174*/175Parser.prototype.error_ = function(inputBytes, pos, errorMsg) {176this.state_ = Parser.State_.INVALID;177this.errorMessage_ = 'The stream is broken @' + this.streamPos_ + '/' + pos +178'. ' +179'Error: ' + errorMsg + '. ' +180'With input:\n' + inputBytes;181throw Error(this.errorMessage_);182};183184185/**186* @throws {!Error} Throws an error message if the input is invalid.187* @override188*/189goog.net.streams.PbStreamParser.prototype.parse = function(input) {190goog.asserts.assert(input instanceof Array || input instanceof ArrayBuffer);191192var parser = this;193var inputBytes = (input instanceof Array) ? input : new Uint8Array(input);194var pos = 0;195196while (pos < inputBytes.length) {197switch (parser.state_) {198case Parser.State_.INVALID: {199parser.error_(inputBytes, pos, 'stream already broken');200break;201}202case Parser.State_.INIT: {203processTagByte(inputBytes[pos]);204break;205}206case Parser.State_.LENGTH: {207processLengthByte(inputBytes[pos]);208break;209}210case Parser.State_.MESSAGE: {211processMessageByte(inputBytes[pos]);212break;213}214default: { throw Error('unexpected parser state: ' + parser.state_); }215}216217parser.streamPos_++;218pos++;219}220221var msgs = parser.result_;222parser.result_ = [];223return msgs.length > 0 ? msgs : null;224225/**226* @param {number} b A tag byte to process227*/228function processTagByte(b) {229if (b & 0x80) {230parser.error_(inputBytes, pos, 'invalid tag');231}232233var wireType = b & 0x07;234if (wireType != 2) {235parser.error_(inputBytes, pos, 'invalid wire type');236}237238parser.tag_ = b >>> 3;239if (parser.tag_ != 1 && parser.tag_ != 2 && parser.tag_ != 15) {240parser.error_(inputBytes, pos, 'unexpected tag');241}242243parser.state_ = Parser.State_.LENGTH;244parser.length_ = 0;245parser.countLengthBytes_ = 0;246}247248/**249* @param {number} b A length byte to process250*/251function processLengthByte(b) {252parser.countLengthBytes_++;253if (parser.countLengthBytes_ == 5) {254if (b & 0xF0) { // length will not fit in a 32-bit uint255parser.error_(inputBytes, pos, 'message length too long');256}257}258parser.length_ |= (b & 0x7F) << ((parser.countLengthBytes_ - 1) * 7);259260if (!(b & 0x80)) { // no more length byte261parser.state_ = Parser.State_.MESSAGE;262parser.countMessageBytes_ = 0;263if (typeof Uint8Array !== 'undefined') {264parser.messageBuffer_ = new Uint8Array(parser.length_);265} else {266parser.messageBuffer_ = new Array(parser.length_);267}268269if (parser.length_ == 0) { // empty message270finishMessage();271}272}273}274275/**276* @param {number} b A message byte to process277*/278function processMessageByte(b) {279parser.messageBuffer_[parser.countMessageBytes_++] = b;280if (parser.countMessageBytes_ == parser.length_) {281finishMessage();282}283}284285/**286* Finishes up building the current message and resets parser state287*/288function finishMessage() {289if (parser.tag_ < Parser.PADDING_TAG_) {290var message = {};291message[parser.tag_] = parser.messageBuffer_;292parser.result_.push(message);293}294parser.state_ = Parser.State_.INIT;295}296};297298299}); // goog.scope300301302