Skip to content
This repository was archived by the owner on Aug 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
package org.apache.spark.sql.crossdata

import com.stratio.crossdata.sql.sources.NativeScan
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{Limit, LeafNode, LogicalPlan}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.sources.LogicalRelation


private[sql] object XDDataframe {
def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
new XDDataframe(sqlContext, logicalPlan)
Expand All @@ -47,19 +45,16 @@ private[sql] object XDDataframe {

nativeExecutors match {
case Seq(head) => Some(head)
case _ => {
case _ =>
if (nativeExecutors.sliding(2).forall { tuple =>
tuple(0).getClass == tuple(1).getClass
}) {
nativeExecutors.headOption
} else {
None
}
}
}
}


}

}
Expand Down Expand Up @@ -92,7 +87,6 @@ private[sql] class XDDataframe(@transient override val sqlContext: SQLContext,
* @inheritdoc
*/
override def collect(): Array[Row] = {
// TODO take
// if cache don't go through native
if (sqlContext.cacheManager.lookupCachedData(this).nonEmpty) {
super.collect()
Expand All @@ -102,15 +96,33 @@ private[sql] class XDDataframe(@transient override val sqlContext: SQLContext,
}
}

/**
* @inheritdoc
*/
override def collectAsList(): java.util.List[Row] = java.util.Arrays.asList(collect() : _*)

private[this] def executeNativeQuery(provider: NativeScan): Option[Array[Row]] = {
provider.buildScan(queryExecution.optimizedPlan)
// TODO nativeQuery may return an object array and then we could construct an array with an specific schema(this.schema)
// TODO cache?
/**
* @inheritdoc
*/
override def limit(n: Int) = XDDataframe(sqlContext, Limit(Literal(n), logicalPlan))

/**
* @inheritdoc
*/
override def count(): Long = {
val aggregateExpr = Seq(Alias(Count(Literal(1)), "count")())
XDDataframe(sqlContext, Aggregate(Seq(), aggregateExpr, logicalPlan)).collect().head.getLong(0)
}


override def limit(n: Int)= XDDataframe(sqlContext, Limit(Literal(n), logicalPlan))
private[this] def executeNativeQuery(provider: NativeScan): Option[Array[Row]] = {
val rowsOption = provider.buildScan(queryExecution.optimizedPlan)
// TODO is it possible to avoid the step below?
rowsOption.map { rows =>
val converter = CatalystTypeConverters.createToScalaConverter(schema)
rows.map(converter(_).asInstanceOf[Row])
}
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ object CatalystToCrossdataAdapter {
private[this] def selectFilters(filters: Seq[Expression]) = {
def translate(predicate: Expression): Option[SourceFilter] = predicate match {
// TODO support more type of filters
// TODO filters which are not supported shouldn't be ignored when working with native connectors
case expressions.EqualTo(a: Attribute, Literal(v, _)) =>
Some(sources.EqualTo(a.name, v))
case expressions.EqualTo(Literal(v, _), a: Attribute) =>
Expand Down