Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-16048][avro] Support read/write confluent schema registry avro… #12919

Merged
merged 3 commits into from Jul 30, 2020

Conversation

@danny0405
Copy link
Contributor

@danny0405 danny0405 commented Jul 17, 2020

… data from Kafka

What is the purpose of the change

Supports read/write with SQL using schema registry avro format.

The format details

The factory identifier (or format id)

There are 2 candidates now ~

avro-sr: the pattern borrowed from KSQL JSON_SR format [1]
avro-confluent: the pattern borrowed from Clickhouse AvroConfluent [2]

Personally i would prefer avro-sr because it is more concise and the confluent is a company name which i think is not that suitable for a format name.

The format attributes

Options required Remark
schema-registry.url true URL to connect to schema registry service
schema-registry.subject false Subject name to write to the Schema Registry service, required for sink

Note: the avro schema string is always inferred from the DDL schema, so user should keep the nullability correct (DDL type default is nullable but avro default is non-nullable).

Brief change log

  • Add avro-sr read/write row data format
  • Add tests

Verifying this change

Added tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: (yes / no / don't know) no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? not documented
@flinkbot
Copy link

@flinkbot flinkbot commented Jul 17, 2020

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit bac1858 (Fri Jul 17 07:20:46 UTC 2020)

Warnings:

  • 1 pom.xml files were touched: Check for build and licensing issues.
  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • 1. The [description] looks good.
  • 2. There is [consensus] that the contribution should go into to Flink.
  • 3. Needs [attention] from.
  • 4. The change fits into the overall [architecture].
  • 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier
@flinkbot
Copy link

@flinkbot flinkbot commented Jul 17, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build
@@ -30,7 +30,7 @@ under the License.
<artifactId>flink-avro-confluent-registry</artifactId>

<properties>
<confluent.schema.registry.version>4.1.0</confluent.schema.registry.version>
<confluent.schema.registry.version>5.5.1</confluent.schema.registry.version>

This comment has been minimized.

@sjwiesman

sjwiesman Jul 17, 2020
Contributor

Update the version in the License notice file

@sjwiesman
Copy link
Contributor

@sjwiesman sjwiesman commented Jul 17, 2020

+1 for SR and please tag me when you open a documentation PR for this feature

@danny0405
Copy link
Contributor Author

@danny0405 danny0405 commented Jul 18, 2020

+1 for SR and please tag me when you open a documentation PR for this feature

Sure, thanks for taking care the document.

@danny0405 danny0405 force-pushed the danny0405:FLINK-16048 branch 2 times, most recently from 0829a1d to edb952c Jul 20, 2020
Copy link
Contributor

@dawidwys dawidwys left a comment

After a quick glimpse. Could we unify the AvroRowDataDeserializationSchema with ConfluentRegistryAvroRowDataDeserializationSchema and RegistryAvroRowDataSerializationSchema?
I really believe we need just a single AvroRowDeserializationSchema for avro for table API.

I am quite sure sth like this would work:

RowDataDeserializationSchema extends ResultTypeQueryable {
  	private final DeserializationSchema<GenericRecord> nestedSchema;
	private final DeserializationRuntimeConverter runtimeConverter;
	private final TypeInformation<RowData> resultType;

	public AvroRowDataDeserializationSchema2(
			DeserializationSchema<GenericRecord> nestedSchema,
			DeserializationRuntimeConverter runtimeConverter,
			TypeInformation<RowData> resultType) {
		this.nestedSchema = nestedSchema;
		this.runtimeConverter = runtimeConverter;
		this.resultType = resultType;
	}

	@Override
	public void open(InitializationContext context) throws Exception {
		nestedSchema.open(context);
	}

	@Override
	public RowData deserialize(byte[] message) throws IOException {
		try {
			GenericRecord deserialize = nestedSchema.deserialize(message);
			return (RowData) runtimeConverter.convert(deserialize);
		} catch (Exception e) {
			throw new IOException("Failed to deserialize Avro record.", e);
		}
	}

       	@Override
	public boolean isEndOfStream(RowData nextElement) {
		return false;
	}

	@Override
	public TypeInformation<RowData> getProducedType() {
		return resultType;
	}
}

and then you would use it like this:

in AvroFormatFactory:

new RowDataDeserializationSchema(
	AvroDeserializationSchema.forGeneric(AvroSchemaConverter.convertToSchema(rowType)),
        createRowConverter(rowType), // we would need to move this method to some utils class or to a common abstract class for factories
	rowDataTypeInfo
);

in RegistryAvroFormatFactory:

new RowDataDeserializationSchema(
	ConfluentRegistryAvroDeserializationSchema.forGeneric(
		AvroSchemaConverter.convertToSchema(rowType),
		schemaRegistryURL
	),
        createRowConverter(rowType),
	rowDataTypeInfo
);

/** A {@link SchemaCoder.SchemaCoderProvider} that uses a cached schema registry
* client underlying. **/
public class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider {

This comment has been minimized.

@dawidwys

dawidwys Jul 21, 2020
Contributor

default scope? + @Internal

@danny0405
Copy link
Contributor Author

@danny0405 danny0405 commented Jul 21, 2020

After a quick glimpse. Could we unify the AvroRowDataDeserializationSchema with ConfluentRegistryAvroRowDataDeserializationSchema and RegistryAvroRowDataSerializationSchema?
I really believe we need just a single AvroRowDeserializationSchema for avro for table API.

I am quite sure sth like this would work:

RowDataDeserializationSchema extends ResultTypeQueryable {
  	private final DeserializationSchema<GenericRecord> nestedSchema;
	private final DeserializationRuntimeConverter runtimeConverter;
	private final TypeInformation<RowData> resultType;

	public AvroRowDataDeserializationSchema2(
			DeserializationSchema<GenericRecord> nestedSchema,
			DeserializationRuntimeConverter runtimeConverter,
			TypeInformation<RowData> resultType) {
		this.nestedSchema = nestedSchema;
		this.runtimeConverter = runtimeConverter;
		this.resultType = resultType;
	}

	@Override
	public void open(InitializationContext context) throws Exception {
		nestedSchema.open(context);
	}

	@Override
	public RowData deserialize(byte[] message) throws IOException {
		try {
			GenericRecord deserialize = nestedSchema.deserialize(message);
			return (RowData) runtimeConverter.convert(deserialize);
		} catch (Exception e) {
			throw new IOException("Failed to deserialize Avro record.", e);
		}
	}

       	@Override
	public boolean isEndOfStream(RowData nextElement) {
		return false;
	}

	@Override
	public TypeInformation<RowData> getProducedType() {
		return resultType;
	}
}

and then you would use it like this:

in AvroFormatFactory:

new RowDataDeserializationSchema(
	AvroDeserializationSchema.forGeneric(AvroSchemaConverter.convertToSchema(rowType)),
        createRowConverter(rowType), // we would need to move this method to some utils class or to a common abstract class for factories
	rowDataTypeInfo
);

in RegistryAvroFormatFactory:

new RowDataDeserializationSchema(
	ConfluentRegistryAvroDeserializationSchema.forGeneric(
		AvroSchemaConverter.convertToSchema(rowType),
		schemaRegistryURL
	),
        createRowConverter(rowType),
	rowDataTypeInfo
);

Thanks for the nice review, i have addressed your comments.

@danny0405 danny0405 force-pushed the danny0405:FLINK-16048 branch 2 times, most recently from d1e4ba7 to f0da3ce Jul 22, 2020
Copy link
Contributor

@dawidwys dawidwys left a comment

Good job on this PR!. I think it is close to a really good shape. I really like that we managed to remove so much of code duplication!

One additional question that needs to be answered before merging the PR is the discussion about the factory IDENTIFIER happening in JIRA.

/** A {@link SchemaCoder.SchemaCoderProvider} that uses a cached schema registry
* client underlying. **/
@Internal
class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider {

This comment has been minimized.

@dawidwys

dawidwys Jul 22, 2020
Contributor

Do we still need to extract this class after the latest changes?

This comment has been minimized.

@danny0405

danny0405 Jul 22, 2020
Author Contributor

Personally i prefer normal interface than static inner one, and also we can reuse this data structure.

This comment has been minimized.

@dawidwys

dawidwys Jul 22, 2020
Contributor

Where can we reuse it?

This comment has been minimized.

@danny0405

danny0405 Jul 22, 2020
Author Contributor

We only need one class for the serialize/deserialize schema.

@@ -114,23 +113,4 @@ private ConfluentRegistryAvroDeserializationSchema(Class<T> recordClazz, @Nullab
new CachedSchemaCoderProvider(url, identityMapCapacity)
);
}

private static class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider {

This comment has been minimized.

@dawidwys

dawidwys Jul 22, 2020
Contributor

ditto

@@ -92,25 +91,4 @@ private ConfluentRegistryAvroSerializationSchema(Class<T> recordClazz, Schema sc
new CachedSchemaCoderProvider(subject, schemaRegistryUrl, DEFAULT_IDENTITY_MAP_CAPACITY)
);
}

private static class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider {

This comment has been minimized.

@dawidwys

dawidwys Jul 22, 2020
Contributor

ditto

public ExpectedException thrown = ExpectedException.none();

@Before
public void before() {

This comment has been minimized.

@dawidwys

dawidwys Jul 22, 2020
Contributor

Why do we need that in the @Before block? Can't we just initialize it statically?

This comment has been minimized.

@danny0405

danny0405 Jul 22, 2020
Author Contributor

We can, make them static also works.

final Map<String, String> options = getAllOptions();

final DynamicTableSource actualSource = createTableSource(options);
assert actualSource instanceof TestDynamicTableFactory.DynamicTableSourceMock;

This comment has been minimized.

@dawidwys

dawidwys Jul 22, 2020
Contributor

Please don't use assert. I can't think of a reason to use an assert in a test. assert is an assertion you can disable via a compiler flag. Why would you want to disable assertions in tests? If you want to check the type of actualSource use e.g.
assertThat(actualSink, instanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class));

* Tests for {@link AvroRowDataDeserializationSchema} and
* {@link AvroRowDataSerializationSchema} for schema registry avro.
*/
public class RegistryAvroRowDataSeDeSchemaTest {

This comment has been minimized.

@dawidwys

dawidwys Jul 22, 2020
Contributor

Those tests have nothing to do with schema registry.

They test the same logic as in AvroRowDataDeSerializationSchemaTest

This comment has been minimized.

@danny0405

danny0405 Jul 22, 2020
Author Contributor

Yes, we can remove it.

import static org.joda.time.DateTimeConstants.MILLIS_PER_DAY;

/** Tool class used to convert from Avro {@link GenericRecord} to {@link RowData}. **/
public class AvroToRowDataConverters {

This comment has been minimized.

@dawidwys

dawidwys Jul 22, 2020
Contributor

@Internal

*
* @return data type matching the schema
*/
public static DataType convertToDataType(String avroSchemaString) {

This comment has been minimized.

@dawidwys

dawidwys Jul 22, 2020
Contributor

Could we add tests for this method?

*/
private transient Encoder encoder;
public AvroRowDataSerializationSchema(RowType rowType) {

This comment has been minimized.

@dawidwys

dawidwys Jul 22, 2020
Contributor

How about we remove this ctor? IMO the logic from this ctor should be only in the factory. I know that this class in theory is PublicEvolving but practically it is only usable from Table API through the factory. Therefore in my opinion it is safe to drop this ctor.

The same applies to SerializationSchema.

This comment has been minimized.

@danny0405

danny0405 Jul 22, 2020
Author Contributor

I would prefer a constructor with default implementation.

This comment has been minimized.

@dawidwys

dawidwys Jul 22, 2020
Contributor

Why?

This comment has been minimized.

@danny0405

danny0405 Jul 22, 2020
Author Contributor

Why not, the avro row data format is default to SE/DE avro without schema registry, if other formats what to customize something, use the correct constructor. Write the same code everywhere just makes no sense.

* {@link AvroRowDataSerializationSchema} for schema registry avro.
*/
public class RegistryAvroRowDataSeDeSchemaTest {
private static final String ADDRESS_SCHEMA = "" +

This comment has been minimized.

@dawidwys

dawidwys Jul 22, 2020
Contributor

IMO we should add a simple test for serializing and deserializing using schema registry. It does not need to be very in depth, but so that it checks that everything is well connected.

This comment has been minimized.

@danny0405

danny0405 Jul 23, 2020
Author Contributor

I have added a schema registry server there, and test the new format SE/DE with connection to the schema registry service. See the new RegistryAvroRowDataSeDeSchemaTest.

@danny0405 danny0405 force-pushed the danny0405:FLINK-16048 branch from f006afe to 9104e12 Jul 22, 2020
@KurtYoung
Copy link
Contributor

@KurtYoung KurtYoung commented Jul 23, 2020

Would flink-avro-confluent be a better module name than flink-avro-confluent-registry? IMO registry has nothing to do with the format itself.

@danny0405
Copy link
Contributor Author

@danny0405 danny0405 commented Jul 23, 2020

Would flink-avro-confluent be a better module name than flink-avro-confluent-registry? IMO registry has nothing to do with the format itself.

It depends on how we understand it, the confluent avro format is mainly designed for schema registry.

@danny0405 danny0405 force-pushed the danny0405:FLINK-16048 branch 7 times, most recently from bc53f69 to 0d49193 Jul 23, 2020
… data from Kafka
@danny0405 danny0405 force-pushed the danny0405:FLINK-16048 branch from 0d49193 to 810321d Jul 24, 2020
@dawidwys

This comment has been minimized.

This comment has been minimized.

Copy link
Member

@wuchong wuchong replied Jul 24, 2020

I'm not sure about this, actually, we don't distinguish source/sink in format for now. For example, csv has ignore-parse-errors, allow-comments which are only avaiable in deserialization.

@dawidwys

This comment has been minimized.

I think we aimed to reduce the prefixes. Shouldn't it be just url or schema-registry-url.
In the case of subject I think we should not prefix it with schema-registry.

However I am not good at coming up with the options therefore would like to hear what @wuchong or @twalthr think about this.

This comment has been minimized.

Copy link
Member

@wuchong wuchong replied Jul 24, 2020

If we use the avro-confluent as identifier, I perfer to have the schema-registry to make it clear what is the url for. For example: avro-confluent.schema-registry.url is more self-explanatory than avro-confluent.url that the url is the schema registry url.

@danny0405 danny0405 force-pushed the danny0405:FLINK-16048 branch from 38511b7 to 6bd8c02 Jul 24, 2020
@danny0405
Copy link
Contributor Author

@danny0405 danny0405 commented Jul 24, 2020

Schema registry is a terminology and people always calls "schema registry url"[1], the same for "schema registry subject".
[1] https://docs.confluent.io/current/schema-registry/index.html#high-availability-for-single-primary-setup

Copy link
Contributor

@dawidwys dawidwys left a comment

Looks good. Good work!

Had two last small comments.

For the option keys:

  1. Understand the schema-registry.url I am good with it.
  2. Still not convinced with the schema-registry.subject. I think it could be safely stripped to subject. Nevertheless I am fine with it as it is now.
public void testRowDataWriteReadWithCompatibleSchema() throws Exception {
testRowDataWriteReadWithSchema(ADDRESS_SCHEMA_COMPATIBLE);
// Validates new schema has been registered.
assertThat(client.getAllVersions("address-value").size(), is(1));

This comment has been minimized.

@dawidwys

dawidwys Jul 24, 2020
Contributor

nit: use SUBJECT

SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
super(recordClazz, reader);
this.schemaCoderProvider = schemaCoderProvider;
this.schemaCoder = schemaCoderProvider.get();
}

public static RegistryAvroDeserializationSchema<GenericRecord> forGeneric(

This comment has been minimized.

@dawidwys

dawidwys Jul 24, 2020
Contributor

Remove this method? If you prefer to keep it add missing javadoc.

# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.flink.formats.avro.registry.confluent.RegistryAvroFormatFactory

This comment has been minimized.

@homepy

homepy Oct 13, 2020

This file does not exist in the release jar (https://mvnrepository.com/artifact/org.apache.flink/flink-avro-confluent-registry/1.11.2), but some other things.
Without this flie, we could not use it in sql-client.sh...
Maybe there is any mistake of the maven-shade-plugin config in pom.xml?

This comment has been minimized.

@dawidwys

dawidwys Oct 13, 2020
Contributor

It is not available, because it will be included in the 1.12 version.

This comment has been minimized.

@homepy

homepy Oct 13, 2020

Thanks.
I found it...
I need to use the master branch now...

@maver1ck
Copy link

@maver1ck maver1ck commented Oct 22, 2020

@danny0405 @dawidwys
Any reasons all the fields read and written by this format has prefix 'record_' ? (I'm using flink sql for this client)
I found responsible code probably here but still have problem with this solution:

@danny0405
Copy link
Contributor Author

@danny0405 danny0405 commented Oct 23, 2020

@danny0405 @dawidwys
Any reasons all the fields read and written by this format has prefix 'record_' ? (I'm using flink sql for this client)
I found responsible code probably here but still have problem with this solution:

It's because of the current strategy to infer the Avro schema is convert from the CREATE TABLE DDL, and there is no way to get the record name here. So we put a constant record as a prefix. The record write out all have explicit field name, but the type should be compatible.

@maver1ck
Copy link

@maver1ck maver1ck commented Oct 23, 2020

@danny0405
Problem is this is not compatible. I'm unable to read anything from Kafka using Confluent Registry. Example:
I have data in Kafka with following value schema:

{
  "type": "record",
  "name": "myrecord",
  "fields": [
    {
      "name": "f1",
      "type": "string"
    }
  ]
}

I'm creating table using this avro-confluent format:

create table `test` (
	`f1` STRING
) WITH (
  'connector' = 'kafka', 
  'topic' = 'test', 
  'properties.bootstrap.servers' = 'localhost:9092', 
  'properties.group.id' = 'test1234', 
   'scan.startup.mode' = 'earliest-offset', 
  'format' = 'avro-confluent'
  'avro-confluent.schema-registry.url' = 'http://localhost:8081'
);

When trying to select data I'm getting error:

SELECT * FROM test;
[ERROR] Could not execute SQL statement. Reason:
org.apache.avro.AvroTypeException: Found myrecord, expecting record, missing required field record_f1
@danny0405
Copy link
Contributor Author

@danny0405 danny0405 commented Oct 23, 2020

Thanks, i think this is a bug, i have logged an issue there. See https://issues.apache.org/jira/browse/FLINK-19779

@danny0405
Copy link
Contributor Author

@danny0405 danny0405 commented Oct 23, 2020

I have fired a fix https://github.com/apache/flink/pull/13763/files, can you help check if possible @maver1ck :)

@maver1ck
Copy link

@maver1ck maver1ck commented Oct 23, 2020

I will check... mvn is running

@maver1ck
Copy link

@maver1ck maver1ck commented Oct 23, 2020

OK. It's working. I'm able to read data.

@maver1ck
Copy link

@maver1ck maver1ck commented Oct 23, 2020

@danny0405
I think we have one more problem.
When Flink is creating schema in registry nullability is not properly set for logical types.
Examples. Table:

create table `test_logical_null` (
	`string_field` STRING,
	`timestamp_field` TIMESTAMP(3)
) WITH (
  'connector' = 'kafka', 
  'topic' = 'test-logical-null', 
  'properties.bootstrap.servers' = 'localhost:9092', 
  'properties.group.id' = 'test12345', 
   'scan.startup.mode' = 'earliest-offset', 
  'format' = 'avro-confluent', -- Must be set to 'avro-confluent' to configure this format.
  'avro-confluent.schema-registry.url' = 'http://localhost:8081', -- URL to connect to Confluent Schema Registry
  'avro-confluent.schema-registry.subject' = 'test-logical-null' -- Subject name to write to the Schema Registry service; required for sinks
)

Schema:

{
  "type": "record",
  "name": "record",
  "fields": [
    {
      "name": "string_field",
      "type": [
        "string",
        "null"
      ]
    },
    {
      "name": "timestamp_field",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

For not null fields:

create table `test_logical_notnull` (
	`string_field` STRING NOT NULL,
	`timestamp_field` TIMESTAMP(3) NOT NULL
) WITH (
  'connector' = 'kafka', 
  'topic' = 'test-logical-notnull', 
  'properties.bootstrap.servers' = 'localhost:9092', 
  'properties.group.id' = 'test12345', 
   'scan.startup.mode' = 'earliest-offset', 
  'format' = 'avro-confluent', -- Must be set to 'avro-confluent' to configure this format.
  'avro-confluent.schema-registry.url' = 'http://localhost:8081', -- URL to connect to Confluent Schema Registry
  'avro-confluent.schema-registry.subject' = 'test-logical-notnull-value' -- Subject name to write to the Schema Registry service; required for sinks
);

Schema

{
  "type": "record",
  "name": "record",
  "fields": [
    {
      "name": "string_field",
      "type": "string"
    },
    {
      "name": "timestamp_field",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

As you can see for string_field we have proper union with null (for nullable field). For timestamp_field in both examples union is missing.

EDIT: I added bug report for this
https://issues.apache.org/jira/browse/FLINK-19786

@danny0405
Copy link
Contributor Author

@danny0405 danny0405 commented Oct 26, 2020

@maver1ck , you are right, we ignore the nullability of TIMESTAMP_WITHOUT_TIME_ZONE, DATE, and TIME_WITHOUT_TIME_ZONE and Decimal, would fix it altogether in this PR.

@maver1ck
Copy link

@maver1ck maver1ck commented Oct 26, 2020

Thanks @danny0405

@danny0405
Copy link
Contributor Author

@danny0405 danny0405 commented Oct 26, 2020

I have updated the fix, @maver1ck , please check if you have time.

@maver1ck
Copy link

@maver1ck maver1ck commented Oct 26, 2020

@danny0405 I see code review is still in progress.
Could you please let me know when this PR would be polished a little ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

9 participants
You can’t perform that action at this time.