我正在研究 Dataflow,我已经通过 Python SDK 构建了自定义管道。
我想将数据流 UI 上的参数添加到我的自定义管道中。
使用附加参数。参考者https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#staticvalue https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#staticvalue
然后我就改变了add_argument
to add_value_provider_argument
遵循谷歌文档
class CustomParams(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
"--input_topic",
type = str,
)
parser.add_value_provider_argument(
"--window_size",
type = int,
default = 5,
)
def run():
pipeline_options = PipelineOptions(pipeline_args, .....)
custom_param = pipeline_options.view_as(CustomParams)
.....
pipeline | "Read PubSub Message" >> beam.io.ReadFromPubSub(custom_param.input_topic)
之后,我尝试制作 GCP 模板。上传脚本看起来像
python custom_pipeline.py \
--runner DataflowRunner \
--project YOUR_PROJECT_ID \
--staging_location gs://YOUR_BUCKET_NAME/staging \
--temp_location gs://YOUR_BUCKET_NAME/temp \
--template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME
但是当我创建上传到 GCS 的模板时出现错误,如下所示
TypeError: expected string or bytes-like object
在线上beam.io.ReadFromPubSub()
它看起来像我得到的东西add_value_provider_argument
Is 运行时值提供者目的。所以我很困惑我必须做什么才能解决这个问题?
我尝试解决这个问题,例如
转换数据类型
beam.io.ReadFromPubSub(str(custom_param.input_topic))
但出现这个错误,
ValueError: PubSub topic must be in the form "projects/<project>/topics/<topic>" (got "RuntimeValueProvider(option: input_topic, type: str, default_value: '...')").
那么请有人对此进行故障排除吗?我不知道该怎么办。