diff --git a/crossdata-ng/src/main/scala/org/apache/spark/sql/crossdata/XDDataFrame.scala b/crossdata-ng/src/main/scala/org/apache/spark/sql/crossdata/XDDataFrame.scala index bfdd7f7e5..802088820 100644 --- a/crossdata-ng/src/main/scala/org/apache/spark/sql/crossdata/XDDataFrame.scala +++ b/crossdata-ng/src/main/scala/org/apache/spark/sql/crossdata/XDDataFrame.scala @@ -17,14 +17,12 @@ package org.apache.spark.sql.crossdata import com.stratio.crossdata.sql.sources.NativeScan -import org.apache.commons.lang3.StringUtils -import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.{Limit, LeafNode, LogicalPlan} import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.sources.LogicalRelation - private[sql] object XDDataframe { def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = { new XDDataframe(sqlContext, logicalPlan) @@ -47,7 +45,7 @@ private[sql] object XDDataframe { nativeExecutors match { case Seq(head) => Some(head) - case _ => { + case _ => if (nativeExecutors.sliding(2).forall { tuple => tuple(0).getClass == tuple(1).getClass }) { @@ -55,11 +53,8 @@ private[sql] object XDDataframe { } else { None } - } } } - - } } @@ -92,7 +87,6 @@ private[sql] class XDDataframe(@transient override val sqlContext: SQLContext, * @inheritdoc */ override def collect(): Array[Row] = { - // TODO take // if cache don't go through native if (sqlContext.cacheManager.lookupCachedData(this).nonEmpty) { super.collect() @@ -102,15 +96,33 @@ private[sql] class XDDataframe(@transient override val sqlContext: SQLContext, } } + /** + * @inheritdoc + */ + override def collectAsList(): java.util.List[Row] = java.util.Arrays.asList(collect() : _*) - private[this] def executeNativeQuery(provider: NativeScan): Option[Array[Row]] = { - provider.buildScan(queryExecution.optimizedPlan) - // TODO nativeQuery may return an object array and then we could construct an array with an specific schema(this.schema) - // TODO cache? + /** + * @inheritdoc + */ + override def limit(n: Int) = XDDataframe(sqlContext, Limit(Literal(n), logicalPlan)) + + /** + * @inheritdoc + */ + override def count(): Long = { + val aggregateExpr = Seq(Alias(Count(Literal(1)), "count")()) + XDDataframe(sqlContext, Aggregate(Seq(), aggregateExpr, logicalPlan)).collect().head.getLong(0) } - override def limit(n: Int)= XDDataframe(sqlContext, Limit(Literal(n), logicalPlan)) + private[this] def executeNativeQuery(provider: NativeScan): Option[Array[Row]] = { + val rowsOption = provider.buildScan(queryExecution.optimizedPlan) + // TODO is it possible to avoid the step below? + rowsOption.map { rows => + val converter = CatalystTypeConverters.createToScalaConverter(schema) + rows.map(converter(_).asInstanceOf[Row]) + } + } } diff --git a/crossdata-ng/src/main/scala/org/apache/spark/sql/sources/crossdata/CatalystToCrossdataAdapter.scala b/crossdata-ng/src/main/scala/org/apache/spark/sql/sources/crossdata/CatalystToCrossdataAdapter.scala index 4e91a8b18..affa2bb47 100644 --- a/crossdata-ng/src/main/scala/org/apache/spark/sql/sources/crossdata/CatalystToCrossdataAdapter.scala +++ b/crossdata-ng/src/main/scala/org/apache/spark/sql/sources/crossdata/CatalystToCrossdataAdapter.scala @@ -50,6 +50,7 @@ object CatalystToCrossdataAdapter { private[this] def selectFilters(filters: Seq[Expression]) = { def translate(predicate: Expression): Option[SourceFilter] = predicate match { // TODO support more type of filters + // TODO filters which are not supported shouldn't be ignored when working with native connectors case expressions.EqualTo(a: Attribute, Literal(v, _)) => Some(sources.EqualTo(a.name, v)) case expressions.EqualTo(Literal(v, _), a: Attribute) =>