Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24252][SQL] Add catalog registration and table catalog APIs. #21306

Closed

Conversation

rdblue
Copy link
Contributor

@rdblue rdblue commented May 11, 2018

What changes were proposed in this pull request?

This adds a v2 API for storage implementations to provide catalog instances to Spark. There are two parts:

  • CatalogProvider is used to instantiate and initialize catalogs via reflection, similar to data sources
  • TableCatalog provides table operations proposed in the Table Metadata API SPIP

This also adds helper classes:

  • Catalogs loads and initializes catalogs using configuration from a SQLConf
  • CaseInsensitiveStringMap is used to pass configuration to CatalogProvider via initialize
  • PartitionTransform is used to pass table partitioning without using Expression

Catalogs are configured by adding config properties starting with spark.sql.catalog.(name). The name property must specify a class that implements CatalogProvider. Other properties under the namespace (spark.sql.catalog.(name).(prop)) are passed to the provider during initialization.

How was this patch tested?

This includes a suite for CaseInsensitiveStringMap and one that tests loading catalog providers.

@SparkQA

This comment has been minimized.

@rdblue
Copy link
Contributor Author

rdblue commented May 11, 2018

@henryr, @cloud-fan, @marmbrus, here's a first pass at adding a catalog mix-in to the v2 API. Please have a look and leave comments on what you'd like to change.

One thing that I don't think we need right away is the alterTable operation. We could easily remove that and add it later. For CTAS and other operations, we do need loadTable, createTable, and dropTable soon.

@SparkQA

This comment has been minimized.

@rdblue
Copy link
Contributor Author

rdblue commented Jun 26, 2018

@cloud-fan, what needs to change to get this in? I'd like to start making more PRs based on these changes.

@cloud-fan
Copy link
Contributor

There are several things we need to discuss here:

  • What catalog operations we want to forward to the data source catalog? Currently it's create/drop/alter table, I think it's good enough for now.
  • How does Spark forward these catalog operations? IMO there are 2 ways.
    • Spark provides an API so that end-users can do it directly. e.g. spark.catalog("iceberge").createTable(...), or SQL API CREATE TABLE iceberge.db1.tbl1 ....
    • When creating/dropping/altering Spark tables, also forward it to the data source catalog. For example, users create a table in Spark via CREATE TABLE t(...) USING iceberg, which creates an table entry in the Hive metastore, as well as a iceberg meta file. When dropping this table, Spark should notify iceberg to remove the meta file. It's arguable that we need this feature or not, if users are willing to always add the catalog prefix, they can just writeCREATE TABLE iceberge.db1.tbl1 ... and SELECT ... FROM iceberge.db1.tbl1, and totoally by-pass the Spark catalog.
  • How to lookup the table metadata from data source catalog? I think database name + table name is a common way(e.g. iceberge.db1.tbl1), but we should also consider other ways like path (e.g. delta.`/a/path`). Maybe we can treat path as a table name without database, and leave the data source to interprete it.
  • How to define table metadata? It seems that Spark only need to know the table schema for analysis. Maybe we can forward DESC TABLE to data source so that Spark doesn't need to standardize the table metadata.
  • How does the table metadata involve in data reading/writing? When reading data without catalog, e.g. spark.read.format("my_data_source").option("table", "my_table").load(), the data source need to get the metadata of the given table. When reading data with catalog, e.g. spark.table("my_data_source.my_table"), the data source also need to get the metadata of the given table, but need to implement it in a different API(CatalogSupport). It's ok to say that data source implementation is responsible to eliminate code duplication themselves.

@rdblue
Copy link
Contributor Author

rdblue commented Jul 3, 2018

@cloud-fan, thanks for the thorough feedback!

What catalog operations we want to forward to the data source catalog? Currently it's create/drop/alter table, I think it's good enough for now.

This PR introduces create, drop, and alter. We can always add more later. These are the ones that we need to implement DataSourceV2 operations and DDL support.

Spark provides an API so that end-users can do it directly. e.g. spark.catalog("iceberge").createTable(...), or SQL API CREATE TABLE iceberge.db1.tbl1 . . .

These two are the easiest and least intrusive way to start because the data source catalog interaction is explicitly tied to a catalog. It also matches the behavior used by other systems for multiple catalogs. I think this is what we should start with and then tackle ideas like your second point.

When creating/dropping/altering Spark tables, also forward it to the data source catalog. . .

For this and a couple other questions, I don't think we need to decide right now. This PR is about getting the interface for other sources in Spark. We don't necessarily need to know all of the ways that users will call it or interact with it, like how DESC TABLE will work.

To your question here, I'm not sure whether the CREATE TABLE ... USING source syntax should use the default catalog or defer to the catalog for source or forward to both, but that doesn't need to block adding this API because I think we can decide it later. In addition, we should probably discuss this on the dev list to make sure we get the behavior right.

How to lookup the table metadata from data source catalog?

The SPIP proposes two catalog interfaces that return Table. One that uses table identifiers and one that uses paths. Data sources can implement support for both or just one. This PR includes just the support for table identifiers. We would add a similar API for path-based tables in another PR.

How to define table metadata? Maybe we can forward DESC TABLE . . .

That sounds like a reasonable idea to me. Like the behavior of USING, I don't think this is something that we have to decide right now. We can add support later as we implement table DDL. Maybe Table should return a DF that is its DESCRIBE output.

How does the table metadata involve in data reading/writing?

This is another example of something we don't need to decide yet. We have a couple different options for the behavior and will want to think them through and discuss them on the dev list. But I don't think that the behavior necessarily needs to be decided before we add this API to sources.

* Data sources must implement this interface to support logical operations that combine writing
* data with catalog tasks, like create-table-as-select.
*/
public interface CatalogSupport {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about it more, what we really need in the near future is all about table: create/alter/lookup/drop tables, instead of how the tables are organized, like databases, and how other information is stored, like view/function.

How about we call it TableSupport?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me.

/**
* Represents table metadata from a {@link DataSourceCatalog}.
*/
public interface Table {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is something we should decide now. IMO schema and properties are must-have, but others may not. e.g. if a data source uses a path to lookup table, then there is no database/table name to it. And we don't have a story to deal with partitions yet.

Copy link
Contributor Author

@rdblue rdblue Jul 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the table metadata, I think we do need partitions. Iceberg creates partitioned tables and I'd like to start getting the DDL operations working. This is why I proposed this metadata in the SPIP a few months ago. We seem to have lazy consensus around it.

You're right about the name. How about I change it to identifier that could be a path or a name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the last comment because I thought this was referring to CatalogSupport at first. Sorry about the confusion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll just remove name and database. We can add them later when we figure out how we want to handle it. We need partitioning right away, though.

* Create a TableChange for renaming a field.
* <p>
* The name is used to find the field to rename. The new name will replace the name of the type.
* For example, renameColumn("a.b.c", "x") should produce column a.b.x.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's great to have an example to show how to use this API, can we add an example to all the methods here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you looking for examples in Javadoc, or an example implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an example to the Javadocs:

import TableChange._
val catalog = source.asInstanceOf[TableSupport].catalog()
catalog.alterTable(ident,
    addColumn("x", IntegerType),
    renameColumn("a", "b"),
    deleteColumn("c")
  )

* TableChange subclasses represent requested changes to a table. These are passed to
* {@link DataSourceCatalog#alterTable}.
*/
public interface TableChange {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great!

return new DeleteColumn(name);
}

final class AddColumn implements TableChange {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed that these aren't public, but should be because they will be passed to implementations through alterTable.

These should also implement unapply for Scala implementations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, I forgot that these are in an interface so they are automatically public.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, I'm not sure it's possible to implement unapply in Java. Not even implementing Product works.

@rdblue rdblue force-pushed the SPARK-24252-add-datasource-v2-catalog branch from 7130d13 to 42ed4a4 Compare July 4, 2018 17:04
@rdblue
Copy link
Contributor Author

rdblue commented Jul 4, 2018

@cloud-fan, I've updated this to address your comments. Thanks for the reviews!

@rdblue rdblue closed this Jul 4, 2018
@rdblue rdblue reopened this Jul 4, 2018
@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

*/
Table createTable(TableIdentifier ident,
StructType schema,
List<Expression> partitions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why it's expressions? In current Spark we only support PARTITION BY columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend reading the proposal SPIP's "Proposed Changes" section, which goes into more detail than this comment can. In short, you're thinking of partitions as columns like Hive tables, but that is a narrow definition that prevents the underlying format from optimizing queries.

Partitions of a table are derived from the column data through some transform. For example, partitioning by day uses a day transform from a timestamp column: day(ts). Hive doesn't keep track of that transform and requires queries to handle it by inserting both ts and day columns. This leads to a few problems, including:

  • Hive has no ability to transform ts > X to the partition predicate day >= day(X). Queries that don't take into account the table's physical storage by adding partition predicates by hand will result in full table scans.
  • Users can insert any data they choose into the day partition and it is up to them to do it correctly.

Also, consider bucketing. Bucketing is also a transform that is effectively a partitioning of the table's files: bucket=hash(col) % N. The reason why bucketing is handled as a special case in Hive is that using it requires knowing the transform and relationship between the bucket number and its column. If we think of partitioning as grouping data by common values of a set of transforms, then buckets are just another partition that we can use for purposes like bucketed joins or limiting scans when looking for specific values.

If the transform is identity -- just copy the value into partition data -- then you have the same functionality that Hive provides. But by building the transformations into the partitioning layer, we can do more to optimize queries, while hiding the physical layout of a table.

Using Expression allows Spark to pass day(ts) to the data source. It is up to the source which expressions are supported. The current FS tables would reject any expression that isn't just a column reference. Iceberg supports identity, year, month, day, hour, truncate, and bucket transforms.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read the SPIP but I still can't figure it out. How can Spark pass these "partition transform" to data source? The current end-user API only allows users to specify partition columns.

And why does the "partition transform" belong to a table definition? I think it's reasonable to say that a table is partition by column timestamp, and supports pushing partition predicates like year(timestamp) > 2000.

Copy link
Contributor Author

@rdblue rdblue Jul 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current end-user API only allows users to specify partition columns.

I think an example would help understand the use of expression here. Right now, I can create a table partitioned by day like this:

CREATE TABLE t (ts timestamp, data string, day string) PARTITIONED BY (day)

Then it's up to queries to supply the right values for day in their queries. I'm proposing we change that to something like the following that uses expressions in the PARTITIONED BY clause instead of only allowing column names:

CREATE TABLE t (ts timestamp, data string) PARTITIONED BY (date(ts));

This can handle all identity partitioning in Hive tables today and it can handle bucketing.

And why does the "partition transform" belong to a table definition?

Transforms should be passed to the table so the source can use them for the physical layout. In DataSourceV2, the source could be anything so it needs to be the component that handles the physical layout. Because we want distributed data sources, we need some way of telling them how to distribute data.

For example, I could use a partitioning expression to tell a source how to shard across PostgreSQL instances. I could also use it to define the keys in an HBase connector. Those are uses of partitioning that Spark can't handle internally.

Like Hive, Spark has only supported a limited definition of partitioning up to now, but I'd like to be able to put tables using Hive's layout behind this API eventually. I think this way of configuring partitioning is a good way to do that, while supporting what Iceberg and other sources will need.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another benefit: this would allow us to translate BUCKETED BY clauses into something we can actually pass to data sources.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, and this seems a very cool feature. My only concern is that, this new feature is not being discussed in dev list yet, and no JIRA ticket is tracking it. I feel a little weird to support a non-existing feature in data source v2 API. Shall we start a thread in dev list for this new feature? And see if we can make it before 2.4.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't say this way of passing partitioning is a new feature. It's just a generalization of the existing partitioning that allows us to pass any type of partition, whether it is bucketing or column-based.

As for open discussion, this was proposed in the SPIP that was fairly widely read and commented on. That SPIP was posted to the dev list a few times, too. I do appreciate you wanting to make sure there's been a chance for the community to discuss it, but there has been plenty of opportunity to comment. At this point, I think it's reasonable to move forward with the implementation.

@rdblue rdblue changed the title [SPARK-24252][SQL] Add DataSourceV2 mix-in for catalog support. [SPARK-24252][SQL] Add DataSourceV2 mix-in for table catalog support. Jul 11, 2018
@rdblue

This comment has been minimized.

@rdblue rdblue force-pushed the SPARK-24252-add-datasource-v2-catalog branch from 023995d to 46100f3 Compare July 25, 2018 18:13
@rdblue rdblue changed the title [SPARK-24252][SQL] Add DataSourceV2 mix-in for table catalog support. [SPARK-24252][SQL] Add catalog registration and table catalog APIs. Jul 25, 2018
@SparkQA

This comment has been minimized.

@rdblue
Copy link
Contributor Author

rdblue commented Jul 25, 2018

@marmbrus, @cloud-fan, @gatorsmile, I've updated this PR to use reflection to instantiate catalogs. This allows implementations to provide named catalogs (and reuse implementations) and configure those catalogs with Spark configuration properties.

FYI @bersprockets, @felixcheung, @jzhuge

@@ -609,6 +611,12 @@ class SparkSession private(
*/
@transient lazy val catalog: Catalog = new CatalogImpl(self)

@transient private lazy val catalogs = new mutable.HashMap[String, CatalogProvider]()

private[sql] def catalog(name: String): CatalogProvider = synchronized {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is private[sql]. This allows us to use the named TableCatalog instances without solving how multiple catalogs should be exposed to users through a public API just yet.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@tigerquoll
Copy link
Contributor

Can we support column range partition predicates please?

* )
* </pre>
*/
public interface TableChange {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we support adding a comment to a column? / table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can add that. What do you suggest changing?

* )
* </pre>
*/
public interface TableChange {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is adding or dropping table partitions a table change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this API covers table configuration, not data modification. If you're interested in dropping partitions, you should look at the DeleteSupport API proposal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be a valid operation to change the partitioning of the table without dropping the entire table and re-creating it? E.g. change the bucket size for such and such column. Seems pretty difficult to do in practice though since the underlying data layout would have to change as part of the modification.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mccheah, our table format supports updating the partitioning of a table, so I think it should be supported. But, this is intended to be an initial API so I didn't want to block this on agreeing how to repartition a table.

@rdblue
Copy link
Contributor Author

rdblue commented Sep 4, 2018

Can we support column range partition predicates please?

This has an "apply" transform for passing other functions directly through, so that may help if you have additional transforms that aren't committed to Spark yet.

As for range partitioning, can you be more specific about what you mean? What does that transform function look like? Part of the rationale for the existing proposal is that these are all widely used and understood. I want to make sure that as we expand the set of validated transforms, we aren't introducing confusion.

Also, could you share the use case you intend for this? It would be great to hear about uses other than just Iceberg tables.

@tigerquoll
Copy link
Contributor

tigerquoll commented Sep 4, 2018 via email

@tigerquoll
Copy link
Contributor

tigerquoll commented Sep 5, 2018

So Kudu range partitions support arbitrary sized partition intervals, like the example below, where the first and last range partition are six months in size, but the middle partition is one year in size.

-- Make a table representing a date/time value as TIMESTAMP.
-- The strings representing the partition bounds are automatically
-- cast to TIMESTAMP values.
create table native_timestamp(id bigint, when_exactly timestamp, event string, primary key (id, when_exactly))
  range (when_exactly)
  (
    partition '2015-06-01' <= values < '2016-01-01',
    partition '2016-01-01' <= values < '2017-01-01',
    partition '2017-01-01' <= values < '2017-06-01'
  )
  stored as kudu;

@rdblue
Copy link
Contributor Author

rdblue commented Sep 6, 2018

@tigerquoll, the proposal isn't to make partitions part of table configuration. It is to make the partitioning scheme part of the table configuration. How sources choose to handle individual partitions is up to the source. How those partitions are exposed through Spark is a different API because the current v2 data source design covers tables that appear to be unpartitioned.

We could support range partitioning with the strategy that was discussed on the dev list, where the configuration is a function application with column references and literals. So your partitioning could be expressed like this:

create table t (id bigint, ts timestamp, data string)
partitioned by (range(ts, '2016-01-01', '2017-01-01', '2017-06-01')) using kudu

@felixcheung
Copy link
Member

where are we on this?

@rdblue
Copy link
Contributor Author

rdblue commented Oct 22, 2018

@felixcheung, we're waiting on more reviews and a community decision about how to pass partition transforms.

For passing transforms, I think the most reasonable compromise is to go with a generic function application, so each transform would be passed as a function/transform name with one or more arguments, where each argument is either a column reference (by name) or a literal value. That's a fairly small public API addition but it supports a lot of different partitioning schemes to be expressed, including the one above for Kudu.

We already have all of this implemented based on the current PR, but I can update this in the next week or so.

/**
* Represents table metadata from a {@link TableCatalog} or other table sources.
*/
public interface Table {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nomenclature here appears to conflict with @cloud-fan's refactor in https://github.com/apache/spark/pull/22547/files#diff-45399ef5eed5c873d5f12bf0f1671b8fR40. Maybe we can call this TableMetadata or TableDescription? Or perhaps we rename the other construct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the two Table classes are trying to be the same thing. This is one of the reasons why I brought it up in the sync. @cloud-fan's current PR isn't yet based on this work, so it doesn't get the abstraction right.

What you linked to uses Table to expose newScanConfigBuilder, basically requiring that all tables are readable. Instead, the implementation classes in #22547 should be interfaces that extend this Table to make it readable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this patch in comparison to the other again (updated to #23086) it looks like this work should be rebased on top of the batch read refactor's PR in order to not have two Table classes that do the same thing - is this the right assessment?

* )
* </pre>
*/
public interface TableChange {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be a valid operation to change the partitioning of the table without dropping the entire table and re-creating it? E.g. change the bucket size for such and such column. Seems pretty difficult to do in practice though since the underlying data layout would have to change as part of the modification.

@jackylee-ch
Copy link
Contributor

@rdblue Have you considered about stream table API? It may have some differences between batch table ddl and stream table ddl.

@rdblue
Copy link
Contributor Author

rdblue commented Nov 29, 2018

@stczwd, thanks for taking a look at this. What are the differences between batch and stream DDL that you think will come up?

import java.util.Map;

/**
* Represents table metadata from a {@link TableCatalog} or other table sources.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it include a View, like what we are doing in the CatalogTable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this interface carries minimal set of operations needed to implement the v2 logical plans. We can expand it later when we need to.

The goal here is to build a replacement catalog API incrementally and to avoid requiring all catalogs to implement all possible catalog features. This API is focused on table operations, not view or function operations that we have yet to define.

@gatorsmile
Copy link
Member

A general question. How to use this catalog API to implement the Hive metastore? Is it doable?

* {@code name}, is also added to the options and will contain the catalog's name; in this case,
* "catalog-name".
*/
public interface CatalogProvider {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed, will these APIs now live in the sql-api package? Also at what point are we going to introduce this new Maven module and package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, do you want me to create the sql-api package in this PR, or do you want to add a separate PR to move the current v2 API?

@jackylee-ch
Copy link
Contributor

jackylee-ch commented Nov 30, 2018

@stczwd, thanks for taking a look at this. What are the differences between batch and stream DDL that you think will come up?

@rdblue

  1. Source needs to be defined for stream table
  2. Stream table requires a special flags to indicate that it is a stream table.
  3. User and Program need to be aware of whether this table is a stream table.
  4. What would we do if the user wants to change the stream table to batch table or convert the batch table to stream table?
  5. What does the stream table metadata you define look like? What is the difference between batch table metadata and batch table metadata?
    I defined the Stream Table based on DataSource V1 (see in Support SQLStreaming in Spark), but found that the above problem can not be completely solved with the catalog api.
    How would you solve these in mew Catalog?

import java.util.List;
import java.util.Map;

public interface TableCatalog extends CatalogProvider {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantics here aren't clear at least to me. Typically a <something>Provider is a class that can instantiate a something. Here it appears to provide itself? The abstraction I would imagine would be to have either:

  • CatalogProvider has a method called get(options, sqlConf) which returns a TableCatalog configured with the given options and SQLConf, or
  • Remove CatalogProvider entirely and put initialize in this interface. Every TableCatalog instance must be initialized before calling any other methods like loadTable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent is to use some interface to load all catalogs, whether they implement TableCatalog, FunctionCatalog, or both (or other catalog API parts). So you load a catalog, then check whether it is a TableCatalog when you want to use it for tables.

Sounds like the name CatalogProvider is the confusing part. You're right that a provider usually implements a get method to provide something. I could change that to CatalogImpl or something. Would that work?

Copy link
Contributor

@mccheah mccheah Nov 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps something as simple as Catalog as the top level then, which is sub-interfaced by TableCatalog, FunctionCatalog, and other "kinds" of catalogs. They can all share the same initialize method which is declared by Catalog. That sounds like the simplest idea, perhaps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about CatalogPlugin? I'm hesitant to go with just Catalog because it isn't very specific. I think it might cause confusion because the interface has only the initialize method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with that!

}

lazy val options: Map[String, String] = {
v1Table.storage.locationUri match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider .map(...).getOrElse, but we haven't been consistent on using or not using match on Option types throughout Spark in general anyways so it's fine to leave as-is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also any particular reason this and the following variables have to be lazy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would the getOrElse pattern work here? If the URI is undefined, what tuple should be added to the table properties?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A second read over this, don't think we necessarily have to use the Option lambdas here, and in fact may be less legible, varying from developer to developer.

But if one were to do so, it'd be something like this...

v1Table.storage.properties + v1Table.storage.locationUri.map(uri -> Map("path" -> CatalogUtils.URITOString(uri)).getOrElse(Map.empty)

* @return an Apply transform for the column
*/
public static PartitionTransform apply(String transform, String colName) {
if ("identity".equals(transform)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to support all PartitionTransform types for a first pass? Though, I would imagine we'd have to for the v2 to v1 catalog adapter. If it weren't for that, I would suggest supporting only a simple set of PartitionTransform, such as only identity, to keep this PR focused on the catalog API and not the partitions API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I wanted to discuss on Wednesday was how to pass these transforms. @rxin and I had some discussions about it on the dev list, but we didn't come up with a decision. I think the solution will probably be to add way to pass generic function application and a list of arguments that are either columns or constant literals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should note that the generic function application will probably look like the Apply case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to defer partition support, or is is fundamentally important enough to get that correct now because we will be building on it on e.g. the very next evolution of this API and its uses? I'm thinking about how to minimize the amount of API we're proposing per change, particularly if choices aren't particularly obvious.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should get this done now. Partition transforms are a generalization of Hive partitioning (which uses some columns directly) and bucketing (which is one specific transform). If we add transformation functions now, we will support both of those with a simple API instead of building in special cases for identity and bucket transforms.

I also have a data source that allows users to configure partitioning using more transforms than just identity and bucketing, so I'd like to get this in so that DDL for those tables works.

/**
* Represents table metadata from a {@link TableCatalog} or other table sources.
*/
public interface Table {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this patch in comparison to the other again (updated to #23086) it looks like this work should be rebased on top of the batch read refactor's PR in order to not have two Table classes that do the same thing - is this the right assessment?

* Return the table properties.
* @return this table's map of string properties
*/
Map<String, String> properties();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we default this to an empty Map? I don't think all tables will support custom properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that works.

* Return the table partitioning transforms.
* @return this table's partitioning transforms
*/
List<PartitionTransform> partitioning();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we default this to no partitioning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

@mccheah
Copy link
Contributor

mccheah commented Nov 30, 2018

@stczwd my understanding here is that a table isn't a streaming table or a batch table, but rather that a table points to data that can either be scanned in stream or in batch, and that the table is responsible for returning either streaming scanners or batch scanners when the logical plan calls for it. The reason why I believe this is the case is because of https://github.com/apache/spark/pull/23086/files#diff-d111d7e2179b55465840c9a81ea004f2R65 and its eventual analogous streaming variant. In the new abstractions we propose here and in our proposal, the catalog gets a reference to a Table object that can build Scans over that table.

In other words, the crucial overarching theme in all of the following matters is that a Table isn't inherently a streaming or a batch table, but rather a Table supports returning streaming and/or batch scans. The table returned by the catalog is a pointer to the data, and the Scan defines how one reads that data.

Source needs to be defined for stream table

The catalog returns an instance of Table that can create Scans that support the toStream method.

Stream table requires a special flags to indicate that it is a stream table.

When one gets back a Scan, calling its toStream method will indicate that the table's data is about to be scanned in a streaming manner.

User and Program need to be aware of whether this table is a stream table.

Probably would be done from the SQL code side. But not as certain about this, can you elaborate?

What would we do if the user wants to change the stream table to batch table or convert the batch table to stream table?

The new abstraction handles this at the Scan level instead of the Table level. Tables are themselves not streamed or batched, but rather they construct scans that can read them in either stream or batch; the Scan implements toBatch and/or toStream to support the appropriate read method.

What does the stream table metadata you define look like? What is the difference between batch table metadata and batch table metadata?

This I don't think is as clear given what has been proposed so far. Will let others offer comment here.

Others should feel free to offer more commentary or correct anything from above.

@jackylee-ch
Copy link
Contributor

@mccheah you mean the tables user created do not distinguish between stream and batch, but only when they are actually read from it?

}

lazy val options: Map[String, String] = {
v1Table.storage.locationUri match {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use lazy for a couple reasons. First, to avoid building maps or other data values that are never used. Second, to avoid a required ordering for fields. If fields depend on one another, then they have to be reordered when those dependencies change. Lazy values never require reordering.

@mccheah
Copy link
Contributor

mccheah commented Nov 30, 2018

@stczwd that's my understanding yeah. Others can feel free to correct me otherwise.

@rdblue
Copy link
Contributor Author

rdblue commented Nov 30, 2018

@stczwd, I agree with @mccheah. Tables are basically named data sets. Whether they support batch, micro-batch streaming, or continuous streaming is determined by checking whether they implement SupportsBatchScan or similar interfaces. Matt's referenced docs are the right place to go for more context. The purpose here is to make catalogs and reads orthogonal. A catalog can return both batch-compatible and stream-compatible source "tables".

A "table" may be a Kafka topic or may be a file-based data source. And note that both of those can support batch and streaming execution. A Kafka topic could be CDC stream that represents a table, and a file-based source could be streamed by periodically checking for new committed files.

This PR is based on an SPIP. That has some background for why I chose the set of table attributes here (schema, partitioning, properties), but a short summary is that those are the core set of attributes that are used in comparable SQL variants and already used in Spark.

@echauchot
Copy link

I can +1 on the features brought by this PR. Even if what I'm interested in is more a "side" feature like having the ability to use the table catalog to inject an objet to the spark sources using a custom table.

cloud-fan pushed a commit that referenced this pull request Mar 8, 2019
## What changes were proposed in this pull request?

This adds a v2 API for adding new catalog plugins to Spark.

* Catalog implementations extend `CatalogPlugin` and are loaded via reflection, similar to data sources
* `Catalogs` loads and initializes catalogs using configuration from a `SQLConf`
* `CaseInsensitiveStringMap` is used to pass configuration to `CatalogPlugin` via `initialize`

Catalogs are configured by adding config properties starting with `spark.sql.catalog.(name)`. The name property must specify a class that implements `CatalogPlugin`. Other properties under the namespace (`spark.sql.catalog.(name).(prop)`) are passed to the provider during initialization along with the catalog name.

This replaces #21306, which will be implemented in two multiple parts: the catalog plugin system (this commit) and specific catalog APIs, like `TableCatalog`.

## How was this patch tested?

Added test suites for `CaseInsensitiveStringMap` and for catalog loading.

Closes #23915 from rdblue/SPARK-24252-add-v2-catalog-plugins.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@rdblue
Copy link
Contributor Author

rdblue commented Apr 11, 2019

Closing this because it is replaced by #24246.

@rdblue rdblue closed this Apr 11, 2019
mccheah pushed a commit to palantir/spark that referenced this pull request May 15, 2019
## What changes were proposed in this pull request?

This adds a v2 API for adding new catalog plugins to Spark.

* Catalog implementations extend `CatalogPlugin` and are loaded via reflection, similar to data sources
* `Catalogs` loads and initializes catalogs using configuration from a `SQLConf`
* `CaseInsensitiveStringMap` is used to pass configuration to `CatalogPlugin` via `initialize`

Catalogs are configured by adding config properties starting with `spark.sql.catalog.(name)`. The name property must specify a class that implements `CatalogPlugin`. Other properties under the namespace (`spark.sql.catalog.(name).(prop)`) are passed to the provider during initialization along with the catalog name.

This replaces apache#21306, which will be implemented in two multiple parts: the catalog plugin system (this commit) and specific catalog APIs, like `TableCatalog`.

## How was this patch tested?

Added test suites for `CaseInsensitiveStringMap` and for catalog loading.

Closes apache#23915 from rdblue/SPARK-24252-add-v2-catalog-plugins.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
10 participants