Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory Issue with bigQuery.createQueryStream in Node.js #1392

Closed
gayathri-mandala opened this issue Jul 10, 2024 · 6 comments
Closed

Memory Issue with bigQuery.createQueryStream in Node.js #1392

gayathri-mandala opened this issue Jul 10, 2024 · 6 comments
Assignees
Labels
api: bigquery Issues related to the googleapis/nodejs-bigquery API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@gayathri-mandala
Copy link

gayathri-mandala commented Jul 10, 2024

The bigQuery.createQueryStream loads an entire data set into memory. When we try to retrieve the data chunk-wise, it causes a memory issue. Upon checking the heap profiles, much data is getting stored in _cachedResponse, _cachedRows, and rows.

Environment details

  • OS: macOS sonoma 14.5
  • Node.js version: 18.12.1
  • npm version: 8.19.2
  • @google-cloud/bigquery version: 7.8.0

Steps to reproduce

Here is the sample script


const query = `Select * from table`;

async function queryBigQuery(query) {
  const bigquery = new BigQuery(creds);

  const queryStream = bigquery.createQueryStream(query);

  console.log('Query started.');

  let recordsBuffer = [];
  const batchSize = 100;

  // Process the stream
  queryStream
    .on('data', row => {
      recordsBuffer.push(row);
      if (recordsBuffer.length >= batchSize) {
        // Process the batch of records
        processBatch(recordsBuffer)
      }
    })
    .on('end', () => {
      // Process any remaining records in the buffer
      if (recordsBuffer.length > 0) {
        processBatch(recordsBuffer);
      }
      console.log('Query completed.');
    })
    .on('error', err => {
      console.error('Error during query execution:', err);
    });
}

// Function to process a batch of records
function processBatch(batch) {
  console.log(`Processing batch of ${batch.length} records.`);
}

queryBigQuery(query).catch(console.error);

When we have multiple connections, and for every connection request, the data gets loaded into memory, causing the memory size to increase.

Issue with autoPaginate
I tried using the autoPaginate field: const queryStream = bigquery.createQueryStream(query, { autoPaginate: true });
However, it still behaves as if autoPaginate is set to false. Is there a way or field that allows us to retrieve the data in chunks rather than loading the entire data into memory?
Reference
Here it is mentioned that we need to end the stream after a certain amount of data. However, this approach could lead to data loss. How can we implement this correctly? Please provide a sample.

@gayathri-mandala gayathri-mandala added priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Jul 10, 2024
@product-auto-label product-auto-label bot added the api: bigquery Issues related to the googleapis/nodejs-bigquery API. label Jul 10, 2024
@alvarowolfx alvarowolfx self-assigned this Jul 10, 2024
@alvarowolfx
Copy link
Contributor

hey @gayathri-mandala, thanks for the report. Unfortunately, using the BigQuery REST API, we can't control the amount of rows being returned per page, so that's why even by using the createQueryStream method, we basically still fetch the query results in pages and keep in memory every page. As explained on #1073 (comment), the createQueryStream just creates an streamified version of the paginated calls to the getQueryResults method.

I'm doing some work integrating the BigQuery Storage Read API, that allows reading a table using gRPC and Arrow data format. That API also works in true stream like fashion. The plan is wrap this work by end of month and users will be able to use it transparently here on this package. Or users will be able to use the BigQuery Storage Read wrapper to fetch tables using it more explicitly. You can keep track here:

You can already use the BigQuery Storage Read API, but there is no client wrapper yet, which makes the developer experience not super nice. Here is a basic sample on how to use with AVRO format with the raw client (but I advise using ARROW format, I can provide a sample if you want to go down that route):

https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/quickstart.js

Also I've just opened a PR #1393 to reduce the memory footprint, by cleaning _cachedRows, _cacheRes and rows. wherever is not needed anymore.

@alvarowolfx alvarowolfx added type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. and removed type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Jul 11, 2024
@gayathri-mandala
Copy link
Author

Thanks for the information @alvarowolfx
could you please provide a sample for the ARROW format?
Please update us when the open PR get merged and released.

@alvarowolfx
Copy link
Contributor

@gayathri-mandala here is a sample for reading data with the BigQuery Storage API and in Arrow format. In this sample I show that you can read a full table directly using the BQ Storage API or run a query, obtain the destination table with results and then read with the BQ Storage API.

async function main() {
  // The read stream contains blocks of Arrow-encoded bytes.
  const {RecordBatchReader} = require('apache-arrow');
  const {Readable} = require('stream');

  // See reference documentation at
  // https://cloud.google.com/bigquery/docs/reference/storage
  const {BigQuery} = require('@google-cloud/bigquery');
  const {BigQueryReadClient} = require('@google-cloud/bigquery-storage');

  const client = new BigQueryReadClient();

  async function bigqueryStorageQuickstart() {
    // Get current project ID. The read session is created in this project.
    // This project can be different from that which contains the table.
    const myProjectId = await client.getProjectId();
    
    // We can run a query job and use the destionation table to read data from it.
    const bigquery = new BigQuery();
    const sqlQuery = 'SELECT name, number, state from `bigquery-public-data.usa_names.usa_1910_current` where state = "CA"';
    const [job] = await bigquery.createQueryJob({
      query: sqlQuery,
      location: 'US',
    });
    const [metadata] = await job.getMetadata();
    const qconfig = metadata.configuration.query;
    const dstTableRef = qconfig.destinationTable;
    const projectId = dstTableRef.projectId;
    const datasetId = dstTableRef.datasetId;
    const tableId = dstTableRef.tableId;
    
    // We cna used a fixed table.
    // This example reads baby name data from the public datasets.
    /*const projectId = 'bigquery-public-data';
    const datasetId = 'usa_names';
    const tableId = 'usa_1910_current';*/

    const tableReference = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;

    const parent = `projects/${myProjectId}`;

    /* We can limit the output columns to a subset of those allowed in the table,
     * and set a simple filter to only report names from the state of
     * Washington (WA).
     */
    const readOptions = {
      selectedFields: [],
      // selectedFields: ['name', 'number', 'state'],
      // rowRestriction: 'state = "WA"',
    };

    // API request.
    const request = {
      parent,
      readSession: {
        table: tableReference,
        // This API can also deliver data serialized in Apache Arrow format.
        // This example leverages Apache Avro.
        dataFormat: 'ARROW',
        readOptions,
      },
    };

    const [session] = await client.createReadSession(request);

    console.log(`session ${session.name} with ${session.streams.length} streams`);
    const streams = [];
    for (const readStream of session.streams) {      
      const readRowsRequest = {
        // Required stream name and optional offset. Offset requested must be less than
        // the last row read from readRows(). Requesting a larger offset is undefined.
        readStream: readStream.name,
        offset: 0,
      };
      const stream = client.readRows(readRowsRequest);
      streams.push(stream);
    }

    async function* mergeStreams(readables) {
      for (const readable of readables) {
        for await (const chunk of readable) {
          yield chunk;
        }
      }
    }
    
    const joined = Readable.from(mergeStreams(streams));

    const rows = [];

    joined
      .on('error', console.error)
      .on('data', data => {
        try {
          const buf = Buffer.concat([
            session.arrowSchema.serializedSchema,
            data.arrowRecordBatch.serializedRecordBatch,
          ]);
          const reader = RecordBatchReader.from(buf);
          const batches = reader.readAll();
          for (const batch of batches) {
            for (const row of batch.toArray()) {
              rows.push(row);
            }
          }
        } catch (error) {
          console.log(error);
        }
      })
      .on('end', () => {
        console.log(`Got ${rows.length} rows`);
      });
  }
  bigqueryStorageQuickstart();
}

main(...process.argv.slice(2));

@jcbowman
Copy link

We are having the same memory issues using createQueryStream.

Is there an update on this to make the data truly streamable?

@alvarowolfx
Copy link
Contributor

We are having the same memory issues using createQueryStream.

Is there an update on this to make the data truly streamable?

@jcbowman with PR #1393, memory usage is reduced, but still doesn't make the call 100% streamable - as explained #1392 (comment) that's not possible when using the BigQuery v2 REST API.

You can use the BQ Storage Read API, which I added an example here #1392 (comment). We are also close to landing a wrapper to make the BQ Storage Read API easier to use with PR googleapis/nodejs-bigquery-storage#431

@alvarowolfx
Copy link
Contributor

as we can't make the createQueryStream truly streamable without the BQ Storage Read API, I'll close this for now as is not something we can fix because of the nature of the BigQuery v2 API.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquery Issues related to the googleapis/nodejs-bigquery API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

No branches or pull requests

3 participants