Join GitHub today
GitHub is home to over 50 million developers working together to host and review code, manage projects, and build software together.
Sign up[FLINK-16048][avro] Support read/write confluent schema registry avro… #12919
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit bac1858 (Fri Jul 17 07:20:46 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
CI report:
Bot commandsThe @flinkbot bot supports the following commands:
|
| @@ -30,7 +30,7 @@ under the License. | |||
| <artifactId>flink-avro-confluent-registry</artifactId> | |||
|
|
|||
| <properties> | |||
| <confluent.schema.registry.version>4.1.0</confluent.schema.registry.version> | |||
| <confluent.schema.registry.version>5.5.1</confluent.schema.registry.version> | |||
sjwiesman
Jul 17, 2020
Contributor
Update the version in the License notice file
Update the version in the License notice file
|
+1 for SR and please tag me when you open a documentation PR for this feature |
Sure, thanks for taking care the document. |
0829a1d
to
edb952c
|
After a quick glimpse. Could we unify the I am quite sure sth like this would work:
and then you would use it like this: in
in
|
|
|
||
| /** A {@link SchemaCoder.SchemaCoderProvider} that uses a cached schema registry | ||
| * client underlying. **/ | ||
| public class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider { |
dawidwys
Jul 21, 2020
Contributor
default scope? + @Internal
default scope? + @Internal
Thanks for the nice review, i have addressed your comments. |
d1e4ba7
to
f0da3ce
|
Good job on this PR!. I think it is close to a really good shape. I really like that we managed to remove so much of code duplication! One additional question that needs to be answered before merging the PR is the discussion about the factory IDENTIFIER happening in JIRA. |
| /** A {@link SchemaCoder.SchemaCoderProvider} that uses a cached schema registry | ||
| * client underlying. **/ | ||
| @Internal | ||
| class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider { |
dawidwys
Jul 22, 2020
Contributor
Do we still need to extract this class after the latest changes?
Do we still need to extract this class after the latest changes?
danny0405
Jul 22, 2020
Author
Contributor
Personally i prefer normal interface than static inner one, and also we can reuse this data structure.
Personally i prefer normal interface than static inner one, and also we can reuse this data structure.
dawidwys
Jul 22, 2020
Contributor
Where can we reuse it?
Where can we reuse it?
danny0405
Jul 22, 2020
Author
Contributor
We only need one class for the serialize/deserialize schema.
We only need one class for the serialize/deserialize schema.
| @@ -114,23 +113,4 @@ private ConfluentRegistryAvroDeserializationSchema(Class<T> recordClazz, @Nullab | |||
| new CachedSchemaCoderProvider(url, identityMapCapacity) | |||
| ); | |||
| } | |||
|
|
|||
| private static class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider { | |||
dawidwys
Jul 22, 2020
Contributor
ditto
ditto
| @@ -92,25 +91,4 @@ private ConfluentRegistryAvroSerializationSchema(Class<T> recordClazz, Schema sc | |||
| new CachedSchemaCoderProvider(subject, schemaRegistryUrl, DEFAULT_IDENTITY_MAP_CAPACITY) | |||
| ); | |||
| } | |||
|
|
|||
| private static class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider { | |||
dawidwys
Jul 22, 2020
Contributor
ditto
ditto
| public ExpectedException thrown = ExpectedException.none(); | ||
|
|
||
| @Before | ||
| public void before() { |
dawidwys
Jul 22, 2020
Contributor
Why do we need that in the @Before block? Can't we just initialize it statically?
Why do we need that in the @Before block? Can't we just initialize it statically?
danny0405
Jul 22, 2020
Author
Contributor
We can, make them static also works.
We can, make them static also works.
| final Map<String, String> options = getAllOptions(); | ||
|
|
||
| final DynamicTableSource actualSource = createTableSource(options); | ||
| assert actualSource instanceof TestDynamicTableFactory.DynamicTableSourceMock; |
dawidwys
Jul 22, 2020
Contributor
Please don't use assert. I can't think of a reason to use an assert in a test. assert is an assertion you can disable via a compiler flag. Why would you want to disable assertions in tests? If you want to check the type of actualSource use e.g.
assertThat(actualSink, instanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class));
Please don't use assert. I can't think of a reason to use an assert in a test. assert is an assertion you can disable via a compiler flag. Why would you want to disable assertions in tests? If you want to check the type of actualSource use e.g.
assertThat(actualSink, instanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class));
| * Tests for {@link AvroRowDataDeserializationSchema} and | ||
| * {@link AvroRowDataSerializationSchema} for schema registry avro. | ||
| */ | ||
| public class RegistryAvroRowDataSeDeSchemaTest { |
dawidwys
Jul 22, 2020
Contributor
Those tests have nothing to do with schema registry.
They test the same logic as in AvroRowDataDeSerializationSchemaTest
Those tests have nothing to do with schema registry.
They test the same logic as in AvroRowDataDeSerializationSchemaTest
danny0405
Jul 22, 2020
Author
Contributor
Yes, we can remove it.
Yes, we can remove it.
| import static org.joda.time.DateTimeConstants.MILLIS_PER_DAY; | ||
|
|
||
| /** Tool class used to convert from Avro {@link GenericRecord} to {@link RowData}. **/ | ||
| public class AvroToRowDataConverters { |
dawidwys
Jul 22, 2020
Contributor
@Internal
@Internal
| * | ||
| * @return data type matching the schema | ||
| */ | ||
| public static DataType convertToDataType(String avroSchemaString) { |
dawidwys
Jul 22, 2020
Contributor
Could we add tests for this method?
Could we add tests for this method?
| */ | ||
| private transient Encoder encoder; | ||
| public AvroRowDataSerializationSchema(RowType rowType) { |
dawidwys
Jul 22, 2020
Contributor
How about we remove this ctor? IMO the logic from this ctor should be only in the factory. I know that this class in theory is PublicEvolving but practically it is only usable from Table API through the factory. Therefore in my opinion it is safe to drop this ctor.
The same applies to SerializationSchema.
How about we remove this ctor? IMO the logic from this ctor should be only in the factory. I know that this class in theory is PublicEvolving but practically it is only usable from Table API through the factory. Therefore in my opinion it is safe to drop this ctor.
The same applies to SerializationSchema.
danny0405
Jul 22, 2020
Author
Contributor
I would prefer a constructor with default implementation.
I would prefer a constructor with default implementation.
dawidwys
Jul 22, 2020
Contributor
Why?
Why?
danny0405
Jul 22, 2020
•
Author
Contributor
Why not, the avro row data format is default to SE/DE avro without schema registry, if other formats what to customize something, use the correct constructor. Write the same code everywhere just makes no sense.
Why not, the avro row data format is default to SE/DE avro without schema registry, if other formats what to customize something, use the correct constructor. Write the same code everywhere just makes no sense.
| * {@link AvroRowDataSerializationSchema} for schema registry avro. | ||
| */ | ||
| public class RegistryAvroRowDataSeDeSchemaTest { | ||
| private static final String ADDRESS_SCHEMA = "" + |
dawidwys
Jul 22, 2020
Contributor
IMO we should add a simple test for serializing and deserializing using schema registry. It does not need to be very in depth, but so that it checks that everything is well connected.
IMO we should add a simple test for serializing and deserializing using schema registry. It does not need to be very in depth, but so that it checks that everything is well connected.
danny0405
Jul 23, 2020
Author
Contributor
I have added a schema registry server there, and test the new format SE/DE with connection to the schema registry service. See the new RegistryAvroRowDataSeDeSchemaTest.
I have added a schema registry server there, and test the new format SE/DE with connection to the schema registry service. See the new RegistryAvroRowDataSeDeSchemaTest.
|
Would |
It depends on how we understand it, the confluent avro format is mainly designed for schema registry. |
bc53f69
to
0d49193
This comment has been minimized.
This comment has been minimized.
dawidwys
commented on flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroOptions.java in 810321d
Jul 24, 2020
|
Shall we prefix it with |
This comment has been minimized.
This comment has been minimized.
|
I'm not sure about this, actually, we don't distinguish source/sink in format for now. For example, |
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
|
If we use the |
|
Schema registry is a terminology and people always calls "schema registry url"[1], the same for "schema registry subject". |
|
Looks good. Good work! Had two last small comments. For the option keys:
|
| public void testRowDataWriteReadWithCompatibleSchema() throws Exception { | ||
| testRowDataWriteReadWithSchema(ADDRESS_SCHEMA_COMPATIBLE); | ||
| // Validates new schema has been registered. | ||
| assertThat(client.getAllVersions("address-value").size(), is(1)); |
dawidwys
Jul 24, 2020
Contributor
nit: use SUBJECT
nit: use SUBJECT
| SchemaCoder.SchemaCoderProvider schemaCoderProvider) { | ||
| super(recordClazz, reader); | ||
| this.schemaCoderProvider = schemaCoderProvider; | ||
| this.schemaCoder = schemaCoderProvider.get(); | ||
| } | ||
|
|
||
| public static RegistryAvroDeserializationSchema<GenericRecord> forGeneric( |
dawidwys
Jul 24, 2020
Contributor
Remove this method? If you prefer to keep it add missing javadoc.
Remove this method? If you prefer to keep it add missing javadoc.
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| org.apache.flink.formats.avro.registry.confluent.RegistryAvroFormatFactory |
homepy
Oct 13, 2020
•
This file does not exist in the release jar (https://mvnrepository.com/artifact/org.apache.flink/flink-avro-confluent-registry/1.11.2), but some other things.
Without this flie, we could not use it in sql-client.sh...
Maybe there is any mistake of the maven-shade-plugin config in pom.xml?
This file does not exist in the release jar (https://mvnrepository.com/artifact/org.apache.flink/flink-avro-confluent-registry/1.11.2), but some other things.
Without this flie, we could not use it in sql-client.sh...
Maybe there is any mistake of the maven-shade-plugin config in pom.xml?
dawidwys
Oct 13, 2020
Contributor
It is not available, because it will be included in the 1.12 version.
It is not available, because it will be included in the 1.12 version.
homepy
Oct 13, 2020
Thanks.
I found it...
I need to use the master branch now...
Thanks.
I found it...
I need to use the master branch now...
|
@danny0405 @dawidwys |
It's because of the current strategy to infer the Avro schema is convert from the |
|
@danny0405
I'm creating table using this avro-confluent format:
When trying to select data I'm getting error:
|
|
Thanks, i think this is a bug, i have logged an issue there. See https://issues.apache.org/jira/browse/FLINK-19779 |
|
I have fired a fix https://github.com/apache/flink/pull/13763/files, can you help check if possible @maver1ck :) |
|
I will check... mvn is running |
|
OK. It's working. I'm able to read data. |
|
@danny0405
Schema:
For not null fields:
Schema
As you can see for string_field we have proper union with null (for nullable field). For timestamp_field in both examples union is missing. EDIT: I added bug report for this |
|
@maver1ck , you are right, we ignore the nullability of |
|
Thanks @danny0405 |
|
I have updated the fix, @maver1ck , please check if you have time. |
|
@danny0405 I see code review is still in progress. |
… data from Kafka
What is the purpose of the change
Supports read/write with SQL using schema registry avro format.
The format details
The factory identifier (or format id)
There are 2 candidates now ~
avro-sr: the pattern borrowed from KSQL JSON_SR format [1]avro-confluent: the pattern borrowed from Clickhouse AvroConfluent [2]Personally i would prefer avro-sr because it is more concise and the confluent is a company name which i think is not that suitable for a format name.
The format attributes
Note: the avro schema string is always inferred from the DDL schema, so user should keep the nullability correct (DDL type default is nullable but avro default is non-nullable).
Brief change log
avro-srread/write row data formatVerifying this change
Added tests.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation