/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace folly { namespace test { namespace async_base_test_lib_detail { constexpr size_t kODirectAlign = 4096; // align reads to 4096 B (for O_DIRECT) constexpr size_t kDefaultFileSize = 6 << 20; // 6MiB constexpr size_t kBatchNumEntries = 1024; constexpr size_t kBatchSize = 192; constexpr size_t kBatchBlockSize = 4096; struct TestSpec { off_t start; size_t size; }; struct TestUtil { static void waitUntilReadable(int fd); static folly::Range readerWait( folly::AsyncBase* reader); using ManagedBuffer = std::unique_ptr; static ManagedBuffer allocateAligned(size_t size); }; template std::unique_ptr getAIO(size_t capacity, AsyncBase::PollMode pollMode) { try { auto ret = std::make_unique(capacity, pollMode); ret->initializeContext(); return ret; } catch (const std::runtime_error&) { } return nullptr; } template void testReadsSerially( const std::vector& specs, folly::AsyncBase::PollMode pollMode) { auto aioReader = getAIO(1, pollMode); SKIP_IF(!aioReader) << "TAsync not available"; typename TAsync::Op op; auto tempFile = folly::test::TempFileUtil::getTempFile(kDefaultFileSize); int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY); SKIP_IF(fd == -1) << "Tempfile can't be opened with O_DIRECT: " << folly::errnoStr(errno); SCOPE_EXIT { ::close(fd); }; for (size_t i = 0; i < specs.size(); i++) { auto buf = TestUtil::allocateAligned(specs[i].size); op.pread(fd, buf.get(), specs[i].size, specs[i].start); aioReader->submit(&op); EXPECT_EQ((i + 1), aioReader->totalSubmits()); EXPECT_EQ(aioReader->pending(), 1); auto ops = test::async_base_test_lib_detail::TestUtil::readerWait(aioReader.get()); EXPECT_EQ(1, ops.size()); EXPECT_TRUE(ops[0] == &op); EXPECT_EQ(aioReader->pending(), 0); ssize_t res = op.result(); EXPECT_LE(0, res) << folly::errnoStr(-res); EXPECT_EQ(specs[i].size, res); op.reset(); } } template void testReadsParallel( const std::vector& specs, folly::AsyncBase::PollMode pollMode, bool multithreaded) { auto aioReader = getAIO(specs.size(), pollMode); SKIP_IF(!aioReader) << "TAsync not available"; std::unique_ptr ops(new typename TAsync::Op[specs.size()]); uintptr_t sizeOf = sizeof(typename TAsync::Op); std::vector bufs; bufs.reserve(specs.size()); auto tempFile = folly::test::TempFileUtil::getTempFile(kDefaultFileSize); int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY); SKIP_IF(fd == -1) << "Tempfile can't be opened with O_DIRECT: " << folly::errnoStr(errno); SCOPE_EXIT { ::close(fd); }; std::vector threads; if (multithreaded) { threads.reserve(specs.size()); } for (size_t i = 0; i < specs.size(); i++) { bufs.push_back(TestUtil::allocateAligned(specs[i].size)); } auto submit = [&](size_t i) { ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start); aioReader->submit(&ops[i]); }; for (size_t i = 0; i < specs.size(); i++) { if (multithreaded) { threads.emplace_back([&submit, i] { submit(i); }); } else { submit(i); } } for (auto& t : threads) { t.join(); } std::vector pending(specs.size(), true); size_t remaining = specs.size(); while (remaining != 0) { EXPECT_EQ(remaining, aioReader->pending()); auto completed = test::async_base_test_lib_detail::TestUtil::readerWait(aioReader.get()); size_t nrRead = completed.size(); remaining -= nrRead; for (size_t i = 0; i < nrRead; i++) { int id = (reinterpret_cast(completed[i]) - reinterpret_cast(ops.get())) / sizeOf; EXPECT_GE(id, 0); EXPECT_LT(id, specs.size()); EXPECT_TRUE(pending[id]); pending[id] = false; ssize_t res = ops[id].result(); EXPECT_LE(0, res) << folly::errnoStr(-res); EXPECT_EQ(specs[id].size, res); } } EXPECT_EQ(specs.size(), aioReader->totalSubmits()); EXPECT_EQ(aioReader->pending(), 0); for (size_t i = 0; i < pending.size(); i++) { EXPECT_FALSE(pending[i]); } } template void testReadsQueued( const std::vector& specs, folly::AsyncBase::PollMode pollMode) { size_t readerCapacity = std::max(specs.size() / 2, size_t(1)); auto aioReader = getAIO(readerCapacity, pollMode); SKIP_IF(!aioReader) << "TAsync not available"; folly::AsyncBaseQueue aioQueue(aioReader.get()); std::unique_ptr ops(new typename TAsync::Op[specs.size()]); uintptr_t sizeOf = sizeof(typename TAsync::Op); std::vector bufs; auto tempFile = folly::test::TempFileUtil::getTempFile(kDefaultFileSize); int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY); SKIP_IF(fd == -1) << "Tempfile can't be opened with O_DIRECT: " << folly::errnoStr(errno); SCOPE_EXIT { ::close(fd); }; for (size_t i = 0; i < specs.size(); i++) { bufs.push_back(TestUtil::allocateAligned(specs[i].size)); ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start); aioQueue.submit(&ops[i]); } std::vector pending(specs.size(), true); size_t remaining = specs.size(); while (remaining != 0) { if (remaining >= readerCapacity) { EXPECT_EQ(readerCapacity, aioReader->pending()); EXPECT_EQ(remaining - readerCapacity, aioQueue.queued()); } else { EXPECT_EQ(remaining, aioReader->pending()); EXPECT_EQ(0, aioQueue.queued()); } auto completed = test::async_base_test_lib_detail::TestUtil::readerWait(aioReader.get()); size_t nrRead = completed.size(); remaining -= nrRead; for (size_t i = 0; i < nrRead; i++) { int id = (reinterpret_cast(completed[i]) - reinterpret_cast(ops.get())) / sizeOf; EXPECT_GE(id, 0); EXPECT_LT(id, specs.size()); EXPECT_TRUE(pending[id]); pending[id] = false; ssize_t res = ops[id].result(); EXPECT_LE(0, res) << folly::errnoStr(-res); EXPECT_EQ(specs[id].size, res); } } EXPECT_EQ(specs.size(), aioReader->totalSubmits()); EXPECT_EQ(aioReader->pending(), 0); EXPECT_EQ(aioQueue.queued(), 0); for (size_t i = 0; i < pending.size(); i++) { EXPECT_FALSE(pending[i]); } } template void testReads( const std::vector& specs, folly::AsyncBase::PollMode pollMode) { testReadsSerially(specs, pollMode); testReadsParallel(specs, pollMode, false); testReadsParallel(specs, pollMode, true); testReadsQueued(specs, pollMode); } template class AsyncTest : public ::testing::Test {}; TYPED_TEST_SUITE_P(AsyncTest); TYPED_TEST_P(AsyncTest, ZeroAsyncDataNotPollable) { test::async_base_test_lib_detail::testReads( {{0, 0}}, folly::AsyncBase::NOT_POLLABLE); } TYPED_TEST_P(AsyncTest, ZeroAsyncDataPollable) { test::async_base_test_lib_detail::testReads( {{0, 0}}, folly::AsyncBase::POLLABLE); } TYPED_TEST_P(AsyncTest, SingleAsyncDataNotPollable) { test::async_base_test_lib_detail::testReads( {{0, test::async_base_test_lib_detail::kODirectAlign}}, folly::AsyncBase::NOT_POLLABLE); test::async_base_test_lib_detail::testReads( {{0, test::async_base_test_lib_detail::kODirectAlign}}, folly::AsyncBase::NOT_POLLABLE); } TYPED_TEST_P(AsyncTest, SingleAsyncDataPollable) { test::async_base_test_lib_detail::testReads( {{0, test::async_base_test_lib_detail::kODirectAlign}}, folly::AsyncBase::POLLABLE); test::async_base_test_lib_detail::testReads( {{0, test::async_base_test_lib_detail::kODirectAlign}}, folly::AsyncBase::POLLABLE); } TYPED_TEST_P(AsyncTest, MultipleAsyncDataNotPollable) { test::async_base_test_lib_detail::testReads( {{test::async_base_test_lib_detail::kODirectAlign, 2 * test::async_base_test_lib_detail::kODirectAlign}, {test::async_base_test_lib_detail::kODirectAlign, 2 * test::async_base_test_lib_detail::kODirectAlign}, {test::async_base_test_lib_detail::kODirectAlign, 4 * test::async_base_test_lib_detail::kODirectAlign}}, folly::AsyncBase::NOT_POLLABLE); test::async_base_test_lib_detail::testReads( {{test::async_base_test_lib_detail::kODirectAlign, 2 * test::async_base_test_lib_detail::kODirectAlign}, {test::async_base_test_lib_detail::kODirectAlign, 2 * test::async_base_test_lib_detail::kODirectAlign}, {test::async_base_test_lib_detail::kODirectAlign, 4 * test::async_base_test_lib_detail::kODirectAlign}}, folly::AsyncBase::NOT_POLLABLE); test::async_base_test_lib_detail::testReads( {{0, 5 * 1024 * 1024}, {test::async_base_test_lib_detail::kODirectAlign, 5 * 1024 * 1024}}, folly::AsyncBase::NOT_POLLABLE); test::async_base_test_lib_detail::testReads( { {test::async_base_test_lib_detail::kODirectAlign, 0}, {test::async_base_test_lib_detail::kODirectAlign, test::async_base_test_lib_detail::kODirectAlign}, {test::async_base_test_lib_detail::kODirectAlign, 2 * test::async_base_test_lib_detail::kODirectAlign}, {test::async_base_test_lib_detail::kODirectAlign, 20 * test::async_base_test_lib_detail::kODirectAlign}, {test::async_base_test_lib_detail::kODirectAlign, 1024 * 1024}, }, folly::AsyncBase::NOT_POLLABLE); } TYPED_TEST_P(AsyncTest, MultipleAsyncDataPollable) { test::async_base_test_lib_detail::testReads( {{test::async_base_test_lib_detail::kODirectAlign, 2 * test::async_base_test_lib_detail::kODirectAlign}, {test::async_base_test_lib_detail::kODirectAlign, 2 * test::async_base_test_lib_detail::kODirectAlign}, {test::async_base_test_lib_detail::kODirectAlign, 4 * test::async_base_test_lib_detail::kODirectAlign}}, folly::AsyncBase::POLLABLE); test::async_base_test_lib_detail::testReads( {{test::async_base_test_lib_detail::kODirectAlign, 2 * test::async_base_test_lib_detail::kODirectAlign}, {test::async_base_test_lib_detail::kODirectAlign, 2 * test::async_base_test_lib_detail::kODirectAlign}, {test::async_base_test_lib_detail::kODirectAlign, 4 * test::async_base_test_lib_detail::kODirectAlign}}, folly::AsyncBase::POLLABLE); test::async_base_test_lib_detail::testReads( {{0, 5 * 1024 * 1024}, {test::async_base_test_lib_detail::kODirectAlign, 5 * 1024 * 1024}}, folly::AsyncBase::NOT_POLLABLE); test::async_base_test_lib_detail::testReads( { {test::async_base_test_lib_detail::kODirectAlign, 0}, {test::async_base_test_lib_detail::kODirectAlign, test::async_base_test_lib_detail::kODirectAlign}, {test::async_base_test_lib_detail::kODirectAlign, 2 * test::async_base_test_lib_detail::kODirectAlign}, {test::async_base_test_lib_detail::kODirectAlign, 20 * test::async_base_test_lib_detail::kODirectAlign}, {test::async_base_test_lib_detail::kODirectAlign, 1024 * 1024}, }, folly::AsyncBase::NOT_POLLABLE); } TYPED_TEST_P(AsyncTest, ManyAsyncDataNotPollable) { { std::vector v; for (int i = 0; i < 1000; i++) { v.push_back( {off_t(test::async_base_test_lib_detail::kODirectAlign * i), test::async_base_test_lib_detail::kODirectAlign}); } test::async_base_test_lib_detail::testReads( v, folly::AsyncBase::NOT_POLLABLE); } } TYPED_TEST_P(AsyncTest, ManyAsyncDataPollable) { { std::vector v; for (int i = 0; i < 1000; i++) { v.push_back( {off_t(test::async_base_test_lib_detail::kODirectAlign * i), test::async_base_test_lib_detail::kODirectAlign}); } test::async_base_test_lib_detail::testReads( v, folly::AsyncBase::POLLABLE); } } TYPED_TEST_P(AsyncTest, NonBlockingWait) { TypeParam aioReader(1, folly::AsyncBase::NOT_POLLABLE); typename TypeParam::Op op; auto tempFile = folly::test::TempFileUtil::getTempFile(kDefaultFileSize); int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY); SKIP_IF(fd == -1) << "Tempfile can't be opened with O_DIRECT: " << folly::errnoStr(errno); SCOPE_EXIT { ::close(fd); }; size_t size = 2 * test::async_base_test_lib_detail::kODirectAlign; auto buf = test::async_base_test_lib_detail::TestUtil::allocateAligned(size); op.pread(fd, buf.get(), size, 0); aioReader.submit(&op); EXPECT_EQ(aioReader.pending(), 1); folly::Range completed; while (completed.empty()) { // poll without blocking until the read request completes. completed = aioReader.wait(0); } EXPECT_EQ(completed.size(), 1); EXPECT_TRUE(completed[0] == &op); ssize_t res = op.result(); EXPECT_LE(0, res) << folly::errnoStr(-res); EXPECT_EQ(size, res); EXPECT_EQ(aioReader.pending(), 0); } TYPED_TEST_P(AsyncTest, Cancel) { constexpr size_t kNumOpsBatch1 = 10; constexpr size_t kNumOpsBatch2 = 10; TypeParam aioReader( kNumOpsBatch1 + kNumOpsBatch2, folly::AsyncBase::NOT_POLLABLE); auto tempFile = folly::test::TempFileUtil::getTempFile(kDefaultFileSize); int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY); SKIP_IF(fd == -1) << "Tempfile can't be opened with O_DIRECT: " << folly::errnoStr(errno); SCOPE_EXIT { ::close(fd); }; size_t completed = 0; std::vector> ops; std::vector bufs; const auto schedule = [&](size_t n) { for (size_t i = 0; i < n; ++i) { const size_t size = 2 * test::async_base_test_lib_detail::kODirectAlign; bufs.push_back( test::async_base_test_lib_detail::TestUtil::allocateAligned(size)); ops.push_back(std::make_unique()); auto& op = *ops.back(); op.setNotificationCallback([&](folly::AsyncBaseOp*) { ++completed; }); op.pread(fd, bufs.back().get(), size, 0); aioReader.submit(&op); } }; // Mix completed and canceled operations for this test. // In order to achieve that, schedule in two batches and do partial // wait() after the first one. schedule(kNumOpsBatch1); EXPECT_EQ(aioReader.pending(), kNumOpsBatch1); EXPECT_EQ(completed, 0); auto result = aioReader.wait(1); EXPECT_GE(result.size(), 1); EXPECT_EQ(completed, result.size()); EXPECT_EQ(aioReader.pending(), kNumOpsBatch1 - result.size()); schedule(kNumOpsBatch2); EXPECT_EQ(aioReader.pending(), ops.size() - result.size()); EXPECT_EQ(completed, result.size()); auto canceled = aioReader.cancel(); EXPECT_EQ(canceled.size(), ops.size() - result.size()); EXPECT_EQ(aioReader.pending(), 0); EXPECT_EQ(completed, result.size()); size_t foundCompleted = 0; for (auto& op : ops) { if (op->state() == folly::AsyncBaseOp::State::COMPLETED) { ++foundCompleted; } else { EXPECT_TRUE(op->state() == folly::AsyncBaseOp::State::CANCELED) << *op; } } EXPECT_EQ(foundCompleted, completed); } // batch tests template class AsyncBatchTest : public ::testing::Test {}; TYPED_TEST_SUITE_P(AsyncBatchTest); TYPED_TEST_P(AsyncBatchTest, BatchRead) { TypeParam aioReader; auto tempFile = folly::test::TempFileUtil::getTempFile(kDefaultFileSize); int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY); SKIP_IF(fd == -1) << "Tempfile can't be opened with O_DIRECT: " << folly::errnoStr(errno); SCOPE_EXIT { ::close(fd); }; using OpPtr = folly::AsyncBaseOp*; std::unique_ptr ops( new typename TypeParam::Op[kBatchNumEntries]); std::unique_ptr opPtrs(new OpPtr[kBatchNumEntries]); std::vector bufs; bufs.reserve(kBatchNumEntries); size_t completed = 0; for (size_t i = 0; i < kBatchNumEntries; i++) { bufs.push_back( folly::test::async_base_test_lib_detail::TestUtil::allocateAligned( kBatchBlockSize)); auto& op = ops[i]; opPtrs[i] = &op; op.setNotificationCallback([&](folly::AsyncBaseOp*) { ++completed; }); op.pread(fd, bufs[i].get(), kBatchBlockSize, i * kBatchBlockSize); } aioReader.submit( Range(opPtrs.get(), opPtrs.get() + kBatchNumEntries)); aioReader.wait(kBatchNumEntries); CHECK_EQ(completed, kBatchNumEntries); } } // namespace async_base_test_lib_detail } // namespace test } // namespace folly