Dogs Chasing Squirrels

A software development blog

Monthly Archives: February 2024

Parsing streaming JSON in Typescript

0

In our servers, we will write large volumes of data out to the response stream directly in order to avoid the memory pressure of having all the data in memory at once.

In JavaScript/TypeScript, all though the fetch API supports streaming responses, there is no such thing, in the browser today, as a streaming JSON parser. As an exercise, I wrote a function that will wrap a request in an AsyncGenerator to yield objects of some type T where the response body consists of a JSON array of that type.
Note that the code yields parsed objects in batches. If every parsed object was yielded individually, the “for async” loop would create so many promises that the performance would degrade terribly.

  // Given a response that returns a JSON array of objects of type T, this
  // function returns an async generator that yields batches of objects of type T of a given batch size.
  // Note: If the batch size is too small, the many async calls reduce the performance
  async function* parseStreaming3(response: Response, batchSize: number = 100): AsyncGenerator {
    // If the response has no response body, stop.  This will only happen if something went wrong with the request.
    if (null === response.body) {
      console.warn(`Response has no body.`)
    } else {
      // The JSON object start character, '{'
      const START_OBJECT = 123;
      // The JSON object end character, '}'
      const END_OBJECT = 125;
      // Create a decoder
      const decoder = new TextDecoder('utf-8');
      // Get a streaming reader for the response body
      const reader = response.body.getReader();
      // Keep track of the object depth
      let depth = 0
      // If an object spans two chunks, the previous bytes that represent the end of the previous buffer
      let previousBytes: Uint8Array | undefined = undefined;
      // The start index of the current object
      let startIndex: number | undefined = undefined;
      // The current batch of items
      let batch = new Array()
      // eslint-disable-next-line no-constant-condition
      while (true) {
        // Get the bytes and whether the body is done
        const { done, value: bytes } = await reader.read();
        // If there's no value, stop.
        // If we have values...
        if (undefined !== bytes) {
          // noinspection JSIncompatibleTypesComparison
          // For each byte in the value...
          for (let i = 0; i < bytes.length; i++) {
            // Get the byte
            const byte = bytes[i];
            // If the byte is the start of a JSON object...
            if (START_OBJECT === byte) {
              // Increment the depth
              depth += 1;
              // If the depth is 1, meaning that this is a top-level object, set the start index
              if (1 === depth) {
                startIndex = i;
              }
              // If the byte is the end of an object...
            } else if (END_OBJECT === byte) {
              // If this is a top-level object...
              if (1 === depth) {
                // If there's a previous start index and previous bytes...
                if (undefined !== previousBytes) {
                  try {
                    // Combine the previous bytes with the current bytes
                    const json = decoder.decode(previousBytes)
                      + decoder.decode(bytes.subarray(0, i + 1));
                    // Parse the JSON into an object of the given type
                    const obj: T = JSON.parse(json);
                    // Add the parsed object to the batch
                    batch.push(obj);
                  } catch(e) {
                    console.warn(e)
                    console.log(` - previous json = `, decoder.decode(previousBytes))
                    console.log(` - json = `, decoder.decode(bytes.subarray(0, i + 1)))
                    // Stop
                    return
                  }
                  // Reset the previous bytes
                  previousBytes = undefined;
                  // If there's a start index...
                } else if (undefined !== startIndex) {
                  try {
                    // Get the JSON from the start index to the current index (inclusive)
                    const json = decoder.decode(bytes.subarray(startIndex, i + 1));
                    // Parse the JSON into an object of the given type
                    const obj: T = JSON.parse(json);
                    // Add the parsed object to the batch
                    batch.push(obj);
                    // Un-set the start index
                    startIndex = undefined;
                  } catch(e) {
                    console.warn(e)
                  }
                }
                // If the batch is at the batch size...
                if (batch.length === batchSize) {
                  // Yield the batch
                  yield batch;
                  // Reset the batch
                  batch = new Array()
                }
              }
              // Decrement the depth
              depth -= 1;
            }
          }
          // Because the start index is cleared at the end of each object,
          // if we're ending the loop with a start index, we must not have
          // encountered the end of the object, meaning that the object
          // spans (at least) two reads.
          if (undefined !== startIndex) {
            // If we have no previous bytes...
            if (undefined === previousBytes) {
              // Save the bytes from the start of the object to end of the buffer.
              // We'll combine this json with the next when we encounter the end of the
              // object in the next read.
              previousBytes = bytes.subarray(startIndex);
            } else {
              // There must not have been an end of the object in the previous read,
              // meaning that the read contains some middle section of an object
              // It happens sometimes, if we happen to get a particularly short read.
              // Combine the previous bytes with the current bytes, extending the data.
              const combinedBytes: Uint8Array = new Uint8Array(previousBytes.length + bytes.length);
              combinedBytes.set(previousBytes);
              combinedBytes.set(bytes, previousBytes.length);
              previousBytes = combinedBytes
            }
          }
        }
        // If we're at the end of the response body, stop.  There's no more data to read.
        if (done) {
          break;
        }
      }
      // If items remain in the batch, yield them
      if (batch.length > 0) {
        yield batch;
      }
    }
  }