Read data from Google Cloud Datastore without ReadFromDatastore On Google Cloud Dataflow

TL;DR

  • Use Cloud Datastore’s helper method (https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py#L155-L167) instead of google-cloud-datastore (https://pypi.org/project/google-cloud-datastore/).

I wanted to read data from Datastore before running an Apache Beam’s pipeline that uses ReadFromDatastore from apache_beam.io.gcp.datastore.v1.datastoreio on Cloud Dataflow. So first, I used google-cloud-datastore (https://pypi.org/project/google-cloud-datastore/) for reading data from Cloud Datastore, but using both ReadFromData (https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py#L54) and google-cloud-datastore (https://pypi.org/project/google-cloud-datastore/) occurred error about conflicting of Protocol Buffers.

The error like this;

Traceback (most recent call last):
...
TypeError: Couldn't build proto file into descriptor pool!
Invalid proto descriptor for file "google/cloud/datastore_v1/proto/entity.proto":
  google.datastore.v1.PartitionId.project_id: "google.datastore.v1.PartitionId.project_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.PartitionId.namespace_id: "google.datastore.v1.PartitionId.namespace_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.PartitionId: "google.datastore.v1.PartitionId" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.partition_id: "google.datastore.v1.Key.partition_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.path: "google.datastore.v1.Key.path" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.PathElement.id_type: "google.datastore.v1.Key.PathElement.id_type" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.PathElement.kind: "google.datastore.v1.Key.PathElement.kind" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.PathElement.id: "google.datastore.v1.Key.PathElement.id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.PathElement.name: "google.datastore.v1.Key.PathElement.name" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.PathElement: "google.datastore.v1.Key.PathElement" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key: "google.datastore.v1.Key" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.ArrayValue.values: "google.datastore.v1.ArrayValue.values" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.ArrayValue: "google.datastore.v1.ArrayValue" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.value_type: "google.datastore.v1.Value.value_type" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.null_value: "google.datastore.v1.Value.null_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.boolean_value: "google.datastore.v1.Value.boolean_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.integer_value: "google.datastore.v1.Value.integer_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.double_value: "google.datastore.v1.Value.double_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.timestamp_value: "google.datastore.v1.Value.timestamp_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.key_value: "google.datastore.v1.Value.key_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.string_value: "google.datastore.v1.Value.string_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.blob_value: "google.datastore.v1.Value.blob_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.geo_point_value: "google.datastore.v1.Value.geo_point_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.entity_value: "google.datastore.v1.Value.entity_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.array_value: "google.datastore.v1.Value.array_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.meaning: "google.datastore.v1.Value.meaning" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.exclude_from_indexes: "google.datastore.v1.Value.exclude_from_indexes" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value: "google.datastore.v1.Value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity.key: "google.datastore.v1.Entity.key" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.properties" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry.key: "google.datastore.v1.Entity.PropertiesEntry.key" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Entity.PropertiesEntry.value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry: "google.datastore.v1.Entity.PropertiesEntry" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity: "google.datastore.v1.Entity" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.partition_id: "google.datastore.v1.PartitionId" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Key.path: "google.datastore.v1.Key.PathElement" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.ArrayValue.values: "google.datastore.v1.Value" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.key_value: "google.datastore.v1.Key" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.entity_value: "google.datastore.v1.Entity" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.array_value: "google.datastore.v1.ArrayValue" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Value" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.key: "google.datastore.v1.Key" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.PropertiesEntry" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto".  To use it here, please add the necessary import.

Hence, I used the built-in function in Apache Beam instead of google-cloud-datastore.

Below is example code that Protocol Buffers conflicts between the two libraries apache_beam.io.gcp.datastore.v1.datastoreio package and google.cloud package.

import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
from google.cloud import datastore


class MyPipeline():

    def _fetch_data(self):
        """Fetch data from Cloud Datastore."""
        ds = datastore.Client('my-project')
        query = ds.query(kind='my-kind')
        query_iterator = query.fetch()
        entities = list(map(self.from_datastore, next(query_iterator.pages)))

        return entities

    def run(self):
        """Run pipeline."""
        entities = self._fetch_data()

        pipeline_options = PipelineOptions()
        p = beam.Pipeline(options=pipeline_options)

        (p
         | 'Read data' >> beam.io.ReadFromDatastore('my-project')
         | 'Do some process' >> beam.io.DoFn(DoSomeProcess(entities)))

        p.run()
        result.wait_until_finish()

if __name__ == '__main__':
    MyPipeline().run()

And this is code that uses Cloud Datastore’s helper method. This code doesn’t use google.cloud package.

import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
from apache_beam.io.gcp.datastore.v1.helper import fetch_entities, get_datastore, query_pb2

class MyPipeline():

    def _fetch_data(self):
        """Fetch data from Cloud Datastore."""
        query = query_pb2.Query()
        query.kind.add().name = 'my-kind'
        entities = fetch_entities('my-project', None, query, get_datastore('my-project'))

        def entity_to_dict(entity):
            image_features = map(lambda a: a.double_value, entity.properties['image_feature'].array_value.values)
            id = entity.key.path[0].id
            color = entity.properties['color'].string_value
            return {'id': id, 'color': color}

        return map(entity_to_dict, entities)

    def run(self):
        """Run pipeline."""
        entities = self._fetch_data()

        pipeline_options = PipelineOptions()
        p = beam.Pipeline(options=pipeline_options)

        (p
         | 'Read data' >> beam.io.ReadFromDatastore('my-project')
         | 'Do some process' >> beam.io.DoFn(DoSomeProcess(entities)))

        p.run()
        result.wait_until_finish()

if __name__ == '__main__':
    MyPipeline().run()