| NoSQL database | High availability and performance |
|---|---|
| Analysis tools | Real-time aggregate queries |
| Ubiquitous Storage | Archiving, scale, data transformations |
{'key':'value'} Store (with indexes)SQL vs NoSQL: Battle of the Backends
Crunching Big Data with BigQuery
| App Engine Datastore | Web Scale collection of user data streams: a non-relational Datastore |
|---|---|
| Google Cloud Storage | Permanent archive of raw CSV data: cloud-based storage |
| Google BigQuery | Analysis of very large datasets |
class AddOne(pipeline.Pipeline):
def run(self, number):
return number + 1
add_pipeline = AddOne(1)
add_pipeline.start() # Start the pipeline
pipeline_id = add_pipeline.pipeline_id # Refer to the pipeline
stage = AddOne.from_id(my_pipeline_id)
if stage.has_finalized:
# do something with result
print stage.outputs.default.value
class AddTwo(pipeline.Pipeline):
def run(self, number):
result = yield AddOne(number)
yield AddOne(result)
class MyPipeline(base_handler.PipelineBase):
def run(self, parameter):
output = yield mapreduce_pipeline.MapreducePipeline(
"name_of_pipeline_step",
"main.map_function", # A Mapper Function
"main.reduce_function", # A Reduce Function
"mapreduce.input_readers.DatastoreInputReader", # Data Source
"mapreduce.output_writers.FileOutputWriter", # Data Sink
mapper_params={}, # Custom Parameters for Mapper
reducer_params={}, # Custom Parameters for Reducer
shards=16) # Workers per Job
yield AnotherPipeline(output)
class IteratorPipeline(base_handler.PipelineBase):
def run(self, entity_type):
output = yield mapreduce_pipeline.MapperPipeline(
"Datastore_Iterator_" + entity_type,
"main.datastore_map",
"mapreduce.input_readers.DatastoreInputReader",
output_writer_spec="mapreduce.output_writers.FileOutputWriter",
params={
"input_reader":{ "entity_kind": entity_type, },
"output_writer":{
"filesystem": "gs", "gs_bucket_name": GS_BUCKET, "output_sharding":"none",
}
},
shards=SHARDS)
yield CloudStorageToBigQuery(output)
BigQuery requires timestamps to be expressed in Unix epoch (integer)
import time, calendar def convert_timestamp_to_epoch(timestamp): time_struct = time.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ') return calendar.timegm(time_struct)
class CloudStorageToBigQuery(base_handler.PipelineBase):
def run(self, files):
table_name = 'gplus_data_%s' % datetime.utcnow().strftime(
'%m%d%Y_%H%M%S')
jobs = bigquery_service.jobs()
result = jobs.insert(projectId=PROJECT_ID,
body=build_job_data(table_name,files)).execute()
Codelab: Querying App Engine logs with BigQuery
class JoinOnSKU(base_handler.PipelineBase):
def run(self):
product_data = yield DatastoreMapperPipeline(
mapper_spec = 'main.Datastore_map',
entity_kind_spec = 'main.ProductDescription',
shards=16)
sales_data = yield DatastoreMapperPipeline(
mapper_spec= 'main.Datastore_map',
entity_kind_spec = 'main.ProductSales',
shards=16)
all_data = yield pipeline_common.Extend(product_data, sales_data)
shuffled_data = yield mapreduce_pipeline.ShufflePipeline(
'Shuffle by Product ID',
all_data,
shards=16)
join_by_user_id = yield mapreduce_pipeline.ReducePipeline(
'Join by SKU ID',
'main.storage_reduce',
output_writer_spec = 'mapreduce.output_writers.FileOutputWriter',
params = {
'output_writer':{
'filesystem': 'gs',
'gs_bucket_name': 'datastore_export',
'output_sharding':'none'
}
},
filenames = shuffled_data)
def storage_reduce(key, values):
# Do something with the resulting values
# A JOIN, a count, etc etc
yield ('%s\n' % result)
Michael Manoochehri.