编译自:Inventory management with BigQuery and Cloud Run
很多人将 Cloud Run 作为一种网站托管的方式, Cloud Run 确实是托管的绝佳工具,但是我们可以利用 Cloud Run 做更多的事情,那么我们今天就来探索一下如何使用 Cloud Run & BigQuery 来创建一个库存管理系统。下面,我会使用爱荷华州酒类控制板数据集的一个子集为我们的虚拟商店创建一个较小的库存文件。
在这个库存管理方案中,我们将一个 CSV 文件放入 Cloud Stroage 中以批量加载新库存,您可以认为我们正在从可以导出 CSV 文件的旧库存管理系统迁移至新系统。
BigQuery 可以直接从 Cloud Storage 导入 CSV 文件,如果不需要进行任何数据转换,建议您使用内置的 CSV 加载功能。对于此次用例,只需要 CSV 中的几列,并且想要做一些其他的数据转换,所以将会编写一些代码。
为了简化代码的部署,我将 Python Functions Framework 用作脚本的外壳。为了访问 Cloud Storage和 BiqQuery,我为每个产品使用了 Python 客户端库,并且正在关注示例资源管理器中的示例。在撰写本文时,我只遇到了两个问题。
首先,如果可能的话,我不想使用本地文件系统来确保我的代码确实是无服务器的,因此我必须在内存中完全处理 CSV 文件的内容。其次,从 Cloud Storage 下载文件的示例将其保留为 Blab,而不是CSV 可以解析的内容。一些简单的命令可以将 Blob 转换为字节,然后将字节转换为 StringIO 对象可以解决该问题。
# Create necessary GCP Clients
storage_client = storage.Client()
bq_client = bigquery.Client()
# Retrieve starting inventory file from storage and parse
bucket_name = "INSERT BUCKET NAME HERE"
bucket = storage_client.bucket(bucket_name)
file_name = "INSERT FILE NAME HERE"
blob = bucket.blob(file_name)
bytedata = blob.download_as_bytes()
data = bytedata.decode("UTF-8")
csv_file_ish = StringIO(data)
inventory_reader = csv.DictReader(csv_file_ish)
其次,BigQuery 客户端要求要插入到表中的任何行都是 JSON 对象。就个人而言,我会拥有首选列表,因为我认为 CSV 文件是2D 数组,但是将行作为 JSON 对象流式传输确实意味着我不必完全按照正确的顺序获取字段。一旦理解了我的代码为什么返回 200 个错误的副本(无效的JSON有效负载),就很容易解决此问题。
Invalid JSON payload received. Unknown name "json" at 'rows[1]': Proto field is not repeating, cannot start list.
我所做的数据转换相对较小,主要由将字符串转换为整数类型组成。价格字段是一个例外。CSV 文件的价格为浮动价格,即小数点后一位。很久以前,我的老师就对我灌输了这样一个观念:价格应该始终以整数形式存储,因此$ 3.00应该存储为300。在所有浮点数变为整数的同时,我还打算将所有的价格增加 10% 的溢价。
# Create data to import
rows_to_insert = []
for row in inventory_reader:
new_row = {}
new_row["item_number"] = int(row["item_number"])
new_row["price"] = int(float(row["state_bottle_retail"])) * 110
new_row["count"] = int(row["bottles_sold"])
new_row["category"] = row["category_name"]
new_row["description"] = row["item_description"]
rows_to_insert.append(new_row)
table_id = "crbq-import-spike.crbq_import_spike.inventory"
# Insert data
errors = bq_client.insert_rows_json(table_id, rows_to_insert) # Make an API request.
简单的库存管理系统的第二部分是在购买或退货时更新 BigQuery 中的数量的方法。我再次将 Python Functions Framework 用于我的代码。如果您习惯于使用 ORM,您会惊讶地发现用于 Python 的BigQuery 客户端不包含 Update 方法。相反,推荐的更新行的方法是直接在表上执行更新查询。
我从代码收到的 http 请求中提取了 item_id 和数量,并将这两者都转换为一个 int 值,以确保接收到的值格式正确,并可以清理输入内容。然后,我可以使用字符串插值将值注入查询中,并针对BigQuery 执行查询。
# Create necessary GCP Clients
bq_client = bigquery.Client()
# Extract quantity and item ID from request
request_json = request.get_json()
item_id = int(request_json['item_id'])
quantity = int(request_json['quantity'])
update_query = (
f"""
UPDATE crbq-import-spike.crbq_import_spike.inventory
SET count = count - {quantity}
WHERE item_number = {item_id}"""
)
query_job = bq_client.query(update_query)
results = query_job.result()
for row in results:
print("{} : {} views".format(row.url, row.view_count))
return "Success"
由于 Cloud Run 可以自动水平扩展,因此我可以同时处理多个库存更新。BigQuery 会处理两个请求修改同一行时可能发生的任何竞争情况。
如果您想自己尝试,可以通过下方链接进行。
Cloud Run : https://cloud.google.com/run