Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions rust/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,15 @@ csv = "1.1"
num = "0.2"
regex = "1.3"
lazy_static = "1.4"
packed_simd = { version = "0.3", optional = true }
chrono = "0.4"
flatbuffers = "0.6"
hex = "0.4"
arrow-flight = { path = "../arrow-flight", optional = true }
prettytable-rs = "0.8.0"

[features]
simd = ["packed_simd"]
flight = ["arrow-flight"]
default = ["simd", "flight"]
default = ["flight"]

[dev-dependencies]
criterion = "0.3"
Expand Down
10 changes: 0 additions & 10 deletions rust/arrow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,6 @@ The above script will run the `flatc` compiler and perform some adjustments to t
- Remove `org::apache::arrow::flatbuffers` namespace
- Add includes to each generated file

## SIMD (Single Instruction Multiple Data)

Arrow uses the [packed_simd](https://crates.io/crates/packed_simd) crate to optimize many of the implementations in the
[compute](https://github.com/apache/arrow/tree/master/rust/arrow/src/compute) module using SIMD intrinsics. These
optimizations are enabled by the `simd` feature flag and are turned on by default, but can be disabled, for example:

```bash
cargo build --no-default-features
```

# Publishing to crates.io

An Arrow committer can publish this crate after an official project release has
Expand Down
110 changes: 19 additions & 91 deletions rust/arrow/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
//! The main type in the module is `Buffer`, a contiguous immutable memory region of
//! fixed size aligned at a 64-byte boundary. `MutableBuffer` is like `Buffer`, but it can
//! be mutated and grown.
#[cfg(feature = "simd")]
use packed_simd::u8x64;

use std::cmp;
use std::convert::AsRef;
Expand Down Expand Up @@ -268,27 +266,6 @@ impl<T: AsRef<[u8]>> From<T> for Buffer {
}
}

/// Helper function for SIMD `BitAnd` and `BitOr` implementations
#[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), feature = "simd"))]
fn bitwise_bin_op_simd_helper<F>(left: &Buffer, right: &Buffer, op: F) -> Buffer
where
F: Fn(u8x64, u8x64) -> u8x64,
{
let mut result = MutableBuffer::new(left.len()).with_bitset(left.len(), false);
let lanes = u8x64::lanes();
for i in (0..left.len()).step_by(lanes) {
let left_data = unsafe { from_raw_parts(left.raw_data().add(i), lanes) };
let right_data = unsafe { from_raw_parts(right.raw_data().add(i), lanes) };
let result_slice: &mut [u8] = unsafe {
from_raw_parts_mut((result.data_mut().as_mut_ptr() as *mut u8).add(i), lanes)
};
unsafe {
bit_util::bitwise_bin_op_simd(&left_data, &right_data, result_slice, &op)
};
}
result.freeze()
}

impl<'a, 'b> BitAnd<&'b Buffer> for &'a Buffer {
type Output = Result<Buffer>;

Expand All @@ -299,27 +276,15 @@ impl<'a, 'b> BitAnd<&'b Buffer> for &'a Buffer {
));
}

// SIMD implementation if available
#[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), feature = "simd"))]
{
return Ok(bitwise_bin_op_simd_helper(&self, &rhs, |a, b| a & b));
}

// Default implementation
#[allow(unreachable_code)]
{
let mut builder = UInt8BufferBuilder::new(self.len());
for i in 0..self.len() {
unsafe {
builder
.append(
self.data().get_unchecked(i) & rhs.data().get_unchecked(i),
)
.unwrap();
}
let mut builder = UInt8BufferBuilder::new(self.len());
for i in 0..self.len() {
unsafe {
builder
.append(self.data().get_unchecked(i) & rhs.data().get_unchecked(i))
.unwrap();
}
Ok(builder.finish())
}
Ok(builder.finish())
}
}

Expand All @@ -333,66 +298,29 @@ impl<'a, 'b> BitOr<&'b Buffer> for &'a Buffer {
));
}

// SIMD implementation if available
#[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), feature = "simd"))]
{
return Ok(bitwise_bin_op_simd_helper(&self, &rhs, |a, b| a | b));
}

// Default implementation
#[allow(unreachable_code)]
{
let mut builder = UInt8BufferBuilder::new(self.len());
for i in 0..self.len() {
unsafe {
builder
.append(
self.data().get_unchecked(i) | rhs.data().get_unchecked(i),
)
.unwrap();
}
let mut builder = UInt8BufferBuilder::new(self.len());
for i in 0..self.len() {
unsafe {
builder
.append(self.data().get_unchecked(i) | rhs.data().get_unchecked(i))
.unwrap();
}
Ok(builder.finish())
}
Ok(builder.finish())
}
}

impl Not for &Buffer {
type Output = Buffer;

fn not(self) -> Buffer {
// SIMD implementation if available
#[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), feature = "simd"))]
{
let mut result =
MutableBuffer::new(self.len()).with_bitset(self.len(), false);
let lanes = u8x64::lanes();
for i in (0..self.len()).step_by(lanes) {
unsafe {
let data = from_raw_parts(self.raw_data().add(i), lanes);
let data_simd = u8x64::from_slice_unaligned_unchecked(data);
let simd_result = !data_simd;
let result_slice: &mut [u8] = from_raw_parts_mut(
(result.data_mut().as_mut_ptr() as *mut u8).add(i),
lanes,
);
simd_result.write_to_slice_unaligned_unchecked(result_slice);
}
}
return result.freeze();
}

// Default implementation
#[allow(unreachable_code)]
{
let mut builder = UInt8BufferBuilder::new(self.len());
for i in 0..self.len() {
unsafe {
builder.append(!self.data().get_unchecked(i)).unwrap();
}
let mut builder = UInt8BufferBuilder::new(self.len());
for i in 0..self.len() {
unsafe {
builder.append(!self.data().get_unchecked(i)).unwrap();
}
builder.finish()
}
builder.finish()
}
}

Expand Down
Loading