NoSQL database | High availability and performance |
---|---|
Analysis tools | Real-time aggregate queries |
Ubiquitous Storage | Archiving, scale, data transformations |
{'key':'value'}
Store (with indexes)SQL vs NoSQL: Battle of the Backends
Crunching Big Data with BigQuery
App Engine Datastore | Web Scale collection of user data streams: a non-relational Datastore |
---|---|
Google Cloud Storage | Permanent archive of raw CSV data: cloud-based storage |
Google BigQuery | Analysis of very large datasets |
class AddOne(pipeline.Pipeline): def run(self, number): return number + 1 add_pipeline = AddOne(1) add_pipeline.start() # Start the pipeline pipeline_id = add_pipeline.pipeline_id # Refer to the pipeline
stage = AddOne.from_id(my_pipeline_id) if stage.has_finalized: # do something with result print stage.outputs.default.value
class AddTwo(pipeline.Pipeline): def run(self, number): result = yield AddOne(number) yield AddOne(result)
class MyPipeline(base_handler.PipelineBase): def run(self, parameter): output = yield mapreduce_pipeline.MapreducePipeline( "name_of_pipeline_step", "main.map_function", # A Mapper Function "main.reduce_function", # A Reduce Function "mapreduce.input_readers.DatastoreInputReader", # Data Source "mapreduce.output_writers.FileOutputWriter", # Data Sink mapper_params={}, # Custom Parameters for Mapper reducer_params={}, # Custom Parameters for Reducer shards=16) # Workers per Job yield AnotherPipeline(output)
class IteratorPipeline(base_handler.PipelineBase): def run(self, entity_type): output = yield mapreduce_pipeline.MapperPipeline( "Datastore_Iterator_" + entity_type, "main.datastore_map", "mapreduce.input_readers.DatastoreInputReader", output_writer_spec="mapreduce.output_writers.FileOutputWriter", params={ "input_reader":{ "entity_kind": entity_type, }, "output_writer":{ "filesystem": "gs", "gs_bucket_name": GS_BUCKET, "output_sharding":"none", } }, shards=SHARDS) yield CloudStorageToBigQuery(output)
BigQuery requires timestamps to be expressed in Unix epoch (integer)
import time, calendar def convert_timestamp_to_epoch(timestamp): time_struct = time.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ') return calendar.timegm(time_struct)
class CloudStorageToBigQuery(base_handler.PipelineBase): def run(self, files): table_name = 'gplus_data_%s' % datetime.utcnow().strftime( '%m%d%Y_%H%M%S') jobs = bigquery_service.jobs() result = jobs.insert(projectId=PROJECT_ID, body=build_job_data(table_name,files)).execute()
Codelab: Querying App Engine logs with BigQuery
class JoinOnSKU(base_handler.PipelineBase): def run(self): product_data = yield DatastoreMapperPipeline( mapper_spec = 'main.Datastore_map', entity_kind_spec = 'main.ProductDescription', shards=16) sales_data = yield DatastoreMapperPipeline( mapper_spec= 'main.Datastore_map', entity_kind_spec = 'main.ProductSales', shards=16)
all_data = yield pipeline_common.Extend(product_data, sales_data) shuffled_data = yield mapreduce_pipeline.ShufflePipeline( 'Shuffle by Product ID', all_data, shards=16)
join_by_user_id = yield mapreduce_pipeline.ReducePipeline( 'Join by SKU ID', 'main.storage_reduce', output_writer_spec = 'mapreduce.output_writers.FileOutputWriter', params = { 'output_writer':{ 'filesystem': 'gs', 'gs_bucket_name': 'datastore_export', 'output_sharding':'none' } }, filenames = shuffled_data)
def storage_reduce(key, values): # Do something with the resulting values # A JOIN, a count, etc etc yield ('%s\n' % result)
Michael Manoochehri.