第一的,
生成简单数据后将数据存储在谷歌云平台bigQuery表中的代码。
导入 Apache-Beam 库并使用它。
跑步者使用了 Google Cloud Platform Dataflow。
这里是代码。
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam
pipeline_options = PipelineOptions(
project='project-id',
runner='runner',
temp_location='bucket-location'
)
def pardo_dofn_methods(test=None):
import apache_beam as beam
class testFunction(beam.DoFn):
def process(self, element):
result = element.split(',')
testing = {'test_column': result[0], 'test_column2': result[1], 'test_column3': result[2],
'test_column4': result[3]}
return [testing]
def finish(self):
print('finish')
with beam.Pipeline(options=pipeline_options) as pipeline:
results = (
pipeline
| 'Generating data' >> beam.Create([
'test1,test2,test3,test4'
'test5,test6,test7,test8'
])
| beam.ParDo(testFunction())
| beam.io.WriteToBigQuery(
'project-id:bigQuery-dataset.table-name',
schema='test_column:STRING, test_column2:STRING, test_column3:STRING, test_column4:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)
)
pardo_dofn_methods()
运行时效果很好。
但是,有两个警告:
BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
experiments = p.options.view_as(DebugOptions).experiments or []
BeamDeprecationWarning: BigQuerySink is deprecated since 2.11.0. Use WriteToBigQuery instead.
kms_key=self.kms_key))
不知道为什么有警告。
谢谢。