Skip to content

Conversation

@dandy10
Copy link
Contributor

@dandy10 dandy10 commented Oct 23, 2020

I would like to be able to configure s3 compatible IO in order to be able to use alternative endpoints. #10560 began moving in this direction but has been stalled for a while. The main comment there was that pipeline options should be used for configuration.

Some open questions that I have are:

  • Is there a reason that a separate S3IO instance was created for every operation in the s3filesystem? If so I can save a reference to the pipeline options in the constructor and pass that in instead of saving the single S3IO.
  • Are there any naming considerations for the S3Options? Should they begin with a naming prefix to differentiate from other options provided on the command line?
  • Does it make sense to pass in tokens/keys through pipeline options? Should they instead be pulled from the environment?

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status --- Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@dandy10
Copy link
Contributor Author

dandy10 commented Oct 23, 2020

@chamikaramj @aaltay @udim @pabloem @charlesccychen, apologies for tagging you all, you're on the nearest OWNERS files and not sure who is most relevant.

@pabloem
Copy link
Member

pabloem commented Oct 23, 2020

thanks. I can review this

@pabloem pabloem self-requested a review October 23, 2020 19:02
@pabloem
Copy link
Member

pabloem commented Oct 23, 2020

Run Portable_Python PreCommit

Comment on lines 1341 to 1358
parser.add_argument(
'--endpoint_url',
default=None,
help='The complete URL to use for the constructed s3 client.')
parser.add_argument(
'--region_name',
default=None,
help='The name of the region associated with the client.')
parser.add_argument(
'--api_version', default=None, help='The API version to use.')
parser.add_argument(
'--verify',
default=None,
help='Whether or not to verify SSL certificates.')
parser.add_argument(
'--use_ssl',
default=True,
help='Whether or not to use SSL. By default, SSL is used.')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct that it's desirable to use a sort of namespace prefix. Perhaps --aws_ or --aws_s3_. What do you think?

You must know more than me about s3 and AWS - I wonder if aws_session_token, aws_secret_access_key, aws_access_key_id in this context are specific for s3, or if they provide some sort of AWS-wide authentication?

If they're s3-specific, then maybe we should namespace them as aws_s3? Let me know what you think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Please do not use endpoint_url, region_name, api_version, verify, use_ssl and so on without a prefix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. I've moved them all to use the same s3 prefix which I think makes sense given they are collected under the S3Options class. @pabloem I believe the access keys can also be used for other AWS services, although I've never actually used them. I think it makes sense to consolidate the more generic aws options with the s3 options together for now given that this is the only use case at the moment. If there is another AWS service added in the future it could make sense then to split them up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! That makes sense to me. Can you fix the broken unittests? Other than that, the change looks great (and it's very welcome, as we'd been needing it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. I tried to have a look at the failing tests and can't figure out which ones have actually failed (the formatting is quite difficult to parse). Unfortunately I don't have access to windows to run locally. The same pattern of failures seems to be affecting #13187 which is just a comment change, so perhaps the failures are unrelated?

@pabloem pabloem self-requested a review October 24, 2020 18:47
@pabloem
Copy link
Member

pabloem commented Oct 26, 2020

The precommit failures are from dataflow jobs trying to start the sdk worker:
image

@pabloem
Copy link
Member

pabloem commented Oct 26, 2020

I am bit confused by it. I can't repro it locally - I'll try a couple more things.

@pabloem
Copy link
Member

pabloem commented Oct 26, 2020

Run Python Precommit

@dandy10
Copy link
Contributor Author

dandy10 commented Oct 27, 2020

I think it is probably due to the incorrect processing of the boolean flag. I've changed to using an action on the argparser, and hopefully that should sort the issue.

@dandy10
Copy link
Contributor Author

dandy10 commented Oct 27, 2020

the two failures in the windows test are PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: so it seems that the tests are not being isolated sufficiently.

@pabloem
Copy link
Member

pabloem commented Oct 27, 2020

that's right - those are known flakes. Thanks @dandy10 !
I'll merge once Flink PVR passes

@pabloem
Copy link
Member

pabloem commented Oct 27, 2020

Run Python_PVR_Flink PreCommit

@pabloem pabloem merged commit b35d4cc into apache:master Oct 27, 2020
@pabloem
Copy link
Member

pabloem commented Oct 27, 2020

thanks @dandy10 ! this is great!

@dandy10 dandy10 deleted the s3-config branch January 16, 2021 20:00
@ConverJens
Copy link

ConverJens commented Jan 18, 2021

@dandy10 @pabloem
Great work with this PR!
I'm trying to get s3 (Minio) to work for TFX, and I get it to work for all but the beam components where I get this strange error:

Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 867, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/io/iobase.py", line 1129, in process
    self.writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/options/value_provider.py", line 135, in _f
    return fnc(self, *args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/io/filebasedsink.py", line 196, in open_writer
    return FileBasedSinkWriter(self, writer_path)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/io/filebasedsink.py", line 417, in __init__
    self.temp_handle = self.sink.open(temp_shard_path)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/options/value_provider.py", line 135, in _f
    return fnc(self, *args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/io/filebasedsink.py", line 138, in open
    return FileSystems.create(temp_path, self.mime_type, self.compression_type)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/io/filesystems.py", line 229, in create
    return filesystem.create(path, mime_type, compression_type)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/io/aws/s3filesystem.py", line 171, in create
    return self._path_open(path, 'wb', mime_type, compression_type)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/io/aws/s3filesystem.py", line 151, in _path_open
    raw_file = s3io.S3IO(options=self._options).open(
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/io/aws/s3io.py", line 63, in __init__
    raise ValueError('Must provide one of client or options')
ValueError: Must provide one of client or options

Do you have any idea what I'm doing wrong?

These are the beam pipeline args that I'm supplying and I know for sure that at least the multi process and nr_of_workers arguments are applied:

'--direct_running_mode=multi_processing',
f'--direct_num_workers={NR_OF_CPUS}',
'--s3_endpoint_url=minio-service.kubeflow:9000',
f'--s3_access_key={ACCESS_KEY}',
f'--s3_secret_access_key={SECRET_ACCESS_KEY},
'--s3_verify=False'

Help would be greatly appreciated!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants