Russian Qt Forum

Qt => Многопоточное программирование, процессы => Тема начата: ssoft от Декабрь 06, 2017, 09:00



Название: Реализация простейшего lock-free кольцевого буф
Отправлено: ssoft от Декабрь 06, 2017, 09:00
Задача - реализация lock-free кольцевого буфера для обмена сообщениями.
Предполагалась простая модель - размер буфера заранее определен.
С буфером могут одновременно работать несколько писателей и читателей на конкурентной основе.
Если писать некуда, писатели ожидают. Если читать нечего, читатели ожидают.

Алгоритм записи такой.
Писатель атомарно сдвигает головку записи, "захватывая" позицию ячейки буфера для записи.
С каждой позицией связан атомарный флаг, который показывает есть в ней запись или нет.
Если запись существует, то писатель ожидает. Как только писатель завершает запись данных в буфер, флаг наличия записи выставляется в true.

Читатель атомарно сдвигает головку чтения, "захватывая" позицию ячейки буфера для чтения.
Если запись еще не существует, то писатель ожидает. Как только писатель завершает чтение данных из буфера, флаг наличия записи выставляется в false.

Вроде все логично и просто, но реализация ниже не работает. Операции с данными убрал, оставил только операции с головками и флагами.
Читатель "пропускает" записи.

Код
C++ (Qt)
#include <atomic>
#include <cstring>
#include <iostream>
#include <thread>
 
struct Buffer
{
   enum { MaxCount = 0x0004 };
 
   struct Packet
   {
       std::atomic< bool > m_is_exists;
       Packet () : m_is_exists() {}
   };
 
   std::atomic< uint32_t > m_write_head;
   std::atomic< uint32_t > m_read_head;
   Packet m_packet[ MaxCount ];
 
   void push ()
   {
       uint32_t pos = ( m_write_head++ ) % MaxCount;
       while( m_packet[ pos ].m_is_exists );
       m_packet[ pos ].m_is_exists = true;
   }
 
   void pop ()
   {
       uint32_t pos = ( m_read_head++ ) % MaxCount;
       while ( !m_packet[ pos ].m_is_exists );
       m_packet[ pos ].m_is_exists = false;
   }
};
 
 
static Buffer global_buffer;
static std::atomic< uint32_t > write_count;
static std::atomic< uint32_t > read_count;
 
void pushValue ()
{
   global_buffer.push();
   ++write_count;
}
 
void popValue ()
{
   global_buffer.pop();
   ++read_count;
}
 
void write ( size_t count )
{
   for ( size_t i = 0; i < count; ++i )
       pushValue();
}
 
void read ( size_t count )
{
   for ( size_t i = 0; i < count; ++i )
       popValue();
}
 
 
int main ( int /*argc*/, char */*argv*/[] )
{
 
   std::thread first_writer( write, 10000000 );
   std::thread second_writer( write, 20000000 );
   std::thread reader( read, 30000000 );
 
   first_writer.join();
   second_writer.join();
   reader.join();
 
   std::cout << "Write count: " << write_count.load() << std::endl;
   std::cout << "Read count: " << read_count.load() << std::endl;
 
   return 0;
}
 


Название: Re: Реализация простейшего lock-free кольцевого буф
Отправлено: Old от Декабрь 06, 2017, 09:59
Код
C++ (Qt)
struct Buffer
{
enum { MaxCount = 0x0004 };
 
struct Packet
{
std::atomic< bool > m_is_exists;
Packet () : m_is_exists() {}
};
 
std::atomic< uint32_t > m_write_head;
std::atomic< uint32_t > m_read_head;
Packet m_packet[ MaxCount ];
 
void push ()
{
uint32_t pos = ( m_write_head++ ) % MaxCount;
bool expected = false;
while( !m_packet[ pos ].m_is_exists.compare_exchange_weak( expected, true ) );
}
 
void pop ()
{
uint32_t pos = ( m_read_head++ ) % MaxCount;
bool expected = true;
while( !m_packet[ pos ].m_is_exists.compare_exchange_weak( expected, false ) );
}
};
 


Название: Re: Реализация простейшего lock-free кольцевого буф
Отправлено: ssoft от Декабрь 06, 2017, 10:31
Спасибо). Так работает.
Но хотелось бы понять в чем кроется ошибка в изначальном примере, чтобы избежать подобных в дальнейшем.
Казалось бы load() (оператор преобразования к типу) и store() (оператор присвоения) также атомарны.
И порядок их применения не может быть изменён ни компилятором, ни CPU.

Какой сценарий поведения приводит к ошибке?


Название: Re: Реализация простейшего lock-free кольцевого буф
Отправлено: ssoft от Декабрь 06, 2017, 11:22
И вот еще, когда с добавились данные Data (по сути любой тип, в примере int), то работа с ними получилась не атомарной.

Код
C++ (Qt)
#include <atomic>
#include <cassert>
#include <cstring>
#include <iostream>
#include <thread>
 
typedef int Data;
 
struct Buffer
{
   enum { MaxCount = 0x0004 };
 
   struct Packet
   {
       Data m_value;
       std::atomic< bool > m_is_exists;
       Packet () : m_value(), m_is_exists() {}
   };
 
   std::atomic< uint32_t > m_write_head;
   std::atomic< uint32_t > m_read_head;
   std::atomic< uint32_t > m_count;
   Packet m_packet[ MaxCount ];
 
   bool push ( const Data & value )
   {
       if ( m_count >= MaxCount )
           return false;
 
       uint32_t pos = ( m_write_head++ ) % MaxCount;
       bool expected = false;
       while( !m_packet[ pos ].m_is_exists.compare_exchange_weak( expected, true ) );
       m_packet[ pos ].m_value = value;
       m_packet[ pos ].m_is_exists = true;
       ++m_count;
       return true;
   }
 
   bool pop ( Data & value )
   {
       if ( m_count == 0 )
           return false;
       uint32_t pos = ( m_read_head++ ) % MaxCount;
       bool expected = true;
       while( !m_packet[ pos ].m_is_exists.compare_exchange_weak( expected, false ) );
       value = m_packet[ pos ].m_value;
       m_packet[ pos ].m_value = Data();
       m_packet[ pos ].m_is_exists = false;
       --m_count;
       return true;
   }
};
 
 
static Buffer global_buffer;
static std::atomic< uint32_t > write_count;
static std::atomic< uint32_t > read_count;
 
void pushValue ()
{
   Data value = 1;
   while( !global_buffer.push( value ) );
   ++write_count;
}
 
void popValue ()
{
   Data value;
   while( !global_buffer.pop( value ) );
   assert( value == 1 );
   ++read_count;
}
 
void write ( size_t count )
{
   for ( size_t i = 0; i < count; ++i )
       pushValue();
}
 
void read ( size_t count )
{
   for ( size_t i = 0; i < count; ++i )
       popValue();
}
 
 
int main ( int /*argc*/, char */*argv*/[] )
{
 
   std::thread first_writer( write, 10000000 );
   std::thread second_writer( write, 20000000 );
   std::thread reader( read, 30000000 );
 
   first_writer.join();
   second_writer.join();
   reader.join();
 
   std::cout << "Write count: " << write_count.load() << std::endl;
   std::cout << "Read count: " << read_count.load() << std::endl;
 
   return 0;
}
 


Название: Re: Реализация простейшего lock-free кольцевого буф
Отправлено: Old от Декабрь 06, 2017, 11:51
Какой сценарий поведения приводит к ошибке?

Код
C++ (Qt)
   void push ()
   {
       uint32_t pos = ( m_write_head++ ) % MaxCount;
       while( m_packet[ pos ].m_is_exists );
       // <<<<<< здесь нитка может переключиться, а когда она получит управление обратно, уже не известно что будет в m_is_exists
       m_packet[ pos ].m_is_exists = true;
   }
 


Название: Re: Реализация простейшего lock-free кольцевого буф
Отправлено: Old от Декабрь 06, 2017, 11:51
то работа с ними получилась не атомарной.
Точно. :)


Название: Re: Реализация простейшего lock-free кольцевого буф
Отправлено: ssoft от Декабрь 06, 2017, 12:07
Цитировать
Код
C++ (Qt)
// <<<<<< здесь нитка может переключиться, а когда она получит управление обратно, уже не известно что будет в m_is_exists
Это то понятно). У писателя и читателя конфликта не будет.
Я проморгал, что буфер круговой). Если происходит переключение, то в первоначальном варианте другой писатель или даже несколько может по кругу дойти до этой позиции и записать туда значение.


Название: Re: Реализация простейшего lock-free кольцевого буф
Отправлено: ssoft от Декабрь 06, 2017, 13:25
Метод compare_exchange_weak/strong не такой уж атомарный(. Значение expected может быть заменено на содержимое во время exchange.[\s]

По сути происходит две операции:[\s]
* проверка, что внутреннее значение равно expected;[\s]
<< здесь может быть переключение контекста[\s]
* выполнение атомарной операции exchange, результат которой помещается в expected.[\s]

Если сравнивать с Qt, то в методы testAndSet* являются атомарными, без возможности переключения контекста внутри.[\s]
Как бы реализовать аналог средствами std?[\s]


Название: Re: Реализация простейшего lock-free кольцевого буф
Отправлено: ssoft от Декабрь 06, 2017, 13:59
Все-таки нужно документацию лучше читать))).

Compares the contents of the atomic object's contained value with expected:
- if true, it replaces the contained value with val (like store).
- if false, it replaces expected with the contained value .

если метод вернул false, то в переменную expected нужно заново присвоить тестовому значению, иначе при следующем вызове expected будет совсем не тем, чем ожидается.

Сам метод атомарный, мое замечание выше неверное.


Название: Re: Реализация простейшего lock-free кольцевого буф
Отправлено: Igors от Декабрь 06, 2017, 15:37
Ну одним bool тут не отделаться, давайте int
Код
C++ (Qt)
enum {
flag_busy = 1,  // есть чтение или запись
flag_data = 2,  // есть данные
};
 
void Buffer::push( int data )
{
uint32_t pos = ( m_write_head++ ) % MaxCount;
 
// захватываем cвободную ячейку
 while (true) {
   int expected = 0;
   if (m_packet[pos].m_is_exists.compare_exchange_weak(expected, flag_busy)) break;
   // yield ? abortFlag ?
 }
 
// пишем данные
  m_packet[pos].m_data = data;
 
// освобождаем ячейку ставя ей флаг "есть данные"
  m_packet[pos].m_exists = flag_data;
}
 
int Buffer::pop( void )
{
uint32_t pos = (m_read_head++) % MaxCount;
 
// захватываем ячейку c данными
 while (true) {
   int expected = flag_data;
   if (m_packet[pos].m_is_exists.compare_exchange_weak(expected, flag_busy)) break;
   // yield ? abortFlag ?
 }
 
// читаем данные
  int data = m_packet[pos].m_data;
 
// освобождаем ячейку ставя ей флаг 0 (свободна)
  m_packet[pos].m_exists = 0;
 
  return data;
}
Ну и в данном случае связываться с lock-free не очень "рентабельно" - атомарный лок и все дела


Название: Re: Реализация простейшего lock-free кольцевого буф
Отправлено: ssoft от Декабрь 06, 2017, 15:39
Получился такой рабочие варианты, кому интересно.

Значения с атомарными флагами состояний.

Код
C++ (Qt)
struct Buffer
{
   enum { MaxCount = 0x0400 };
   enum class Status : uint8_t  { Empty, Proceed, Complete };
 
   struct Packet
   {
       Data m_value;
       std::atomic< Status > m_status;
       Packet () : m_value(), m_status() {}
   };
 
   std::atomic< uint32_t > m_write_head;
   std::atomic< uint32_t > m_read_head;
   std::atomic< uint32_t > m_count;
   Packet m_packet[ MaxCount ];
 
   bool push ( const Data & value )
   {
       if ( m_count >= MaxCount )
           return false;
 
       uint32_t pos = ( m_write_head++ ) % MaxCount;
       Status expected = Status::Empty;
       while( !m_packet[ pos ].m_status.compare_exchange_weak( expected, Status::Proceed ) )
           expected = Status::Empty;
       m_packet[ pos ].m_value = value;
       m_packet[ pos ].m_status = Status::Complete;
       ++m_count;
       return true;
   }
 
   bool pop ( Data & value )
   {
       if ( m_count == 0 )
           return false;
       uint32_t pos = ( m_read_head++ ) % MaxCount;
       Status expected = Status::Complete;
       while( !m_packet[ pos ].m_status.compare_exchange_weak( expected, Status::Proceed ) )
           expected = Status::Complete;
       value = m_packet[ pos ].m_value;
       //m_packet[ pos ].m_value = Data();
       m_packet[ pos ].m_status = Status::Empty;
       --m_count;
       return true;
   }
};
 
 
static Buffer global_buffer;
static std::atomic< uint32_t > write_count;
static std::atomic< uint32_t > read_count;
 
void pushValue ()
{
   Data value;
   while( !global_buffer.push( value ) );
   ++write_count;
}
 
void popValue ()
{
   Data value;
   while( !global_buffer.pop( value ) );
   ++read_count;
}
 
void write ( size_t count )
{
   for ( size_t i = 0; i < count; ++i )
       pushValue();
}
 
void read ( size_t count )
{
   for ( size_t i = 0; i < count; ++i )
       popValue();
}
 
int main ( int /*argc*/, char */*argv*/[] )
{
   std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
 
   std::thread first_writer( write, 10000000 );
   std::thread second_writer( write, 20000000 );
   std::thread reader( read, 30000000 );
 
   first_writer.join();
   second_writer.join();
   reader.join();
 
   std::chrono::steady_clock::time_point end= std::chrono::steady_clock::now();
 
   std::cout << "Write count: " << write_count.load() << std::endl;
   std::cout << "Read count: " << read_count.load() << std::endl;
   std::cout << "Time elapsed: " << std::chrono::duration_cast< std::chrono::milliseconds >(end - begin).count() << "ms" << std::endl;
 
   return 0;
}
 

Через указатель на данные

Код
C++ (Qt)
typedef Data * DataPtr;
 
struct Buffer
{
   enum { MaxCount = 0x0004 };
 
   struct Packet
   {
       std::atomic< DataPtr > m_ptr;
       Packet () : m_ptr() {}
   };
 
   std::atomic< uint32_t > m_write_head;
   std::atomic< uint32_t > m_read_head;
   std::atomic< uint32_t > m_count;
   Packet m_packets[ MaxCount ];
 
   bool push ( DataPtr value )
   {
       if ( m_count >= MaxCount )
           return false;
       uint32_t pos = ( m_write_head++ ) % MaxCount;
       DataPtr expected = nullptr;
       while( !m_packets[ pos ].m_ptr.compare_exchange_weak( expected, value ) )
           expected = nullptr;
       ++m_count;
       return true;
   }
 
   bool pop ( DataPtr & value )
   {
       if ( m_count == 0 )
           return false;
       uint32_t pos = ( m_read_head++ ) % MaxCount;
       while( ( value = m_packets[ pos ].m_ptr.exchange( nullptr ) ) == nullptr );
       --m_count;
       return true;
   }
};
 
 
static Buffer global_buffer;
static std::atomic< uint32_t > write_count;
static std::atomic< uint32_t > read_count;
 
void pushValue ()
{
   DataPtr value = new Data;
   while( !global_buffer.push( value ) );
   ++write_count;
}
 
void popValue ()
{
   DataPtr value = nullptr;
   while( !global_buffer.pop( value ) );
   delete value;
   ++read_count;
}
 
void write ( size_t count )
{
   for ( size_t i = 0; i < count; ++i )
       pushValue();
}
 
void read ( size_t count )
{
   for ( size_t i = 0; i < count; ++i )
       popValue();
}
 
int main ( int /*argc*/, char */*argv*/[] )
{
   std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
 
   std::thread first_writer( write, 10000000 );
   std::thread second_writer( write, 20000000 );
   std::thread reader( read, 30000000 );
 
   first_writer.join();
   second_writer.join();
   reader.join();
 
   std::chrono::steady_clock::time_point end= std::chrono::steady_clock::now();
 
   std::cout << "Write count: " << write_count.load() << std::endl;
   std::cout << "Read count: " << read_count.load() << std::endl;
   std::cout << "Time elapsed: " << std::chrono::duration_cast< std::chrono::milliseconds >(end - begin).count() << "ms" << std::endl;
 
   return 0;
}
 

Второй вариант медленее чем первый почти в 1.5-а раза из-за new/delete, но не требует наличие флагов состояния.


Название: Re: Реализация простейшего lock-free кольцевого буф
Отправлено: ssoft от Декабрь 06, 2017, 17:06
Ну и в данном случае связываться с lock-free не очень "рентабельно" - атомарный лок и все дела

Если делать атомарный lock, тогда нельзя будет конкурентно писать и читать данные. Буфер будет работать псевдопоследовательно - кто-то один пишет или читает. Здесь же конкурентное чтение и запись возможна. Другое дело, что здесь постоянный сброс кеша CPU происходит, но это лишь простейшая реализация, так - "пощупать")).


Название: Re: Реализация простейшего lock-free кольцевого буф
Отправлено: Igors от Декабрь 07, 2017, 07:44
Второй вариант медленее чем первый почти в 1.5-а раза из-за new/delete, но не требует наличие флагов состояния.
По указателю - это классика lock-free  :)

Да, и вот
Код
C++ (Qt)
       if ( m_count >= MaxCount )
           return false;
 
В multi-threading такие проверки - мертвому припарка. Нужны TryPush/TryPop которые возвращают управление немедленно если захват не удался. Вообще основная работа - организовать эффективное ожидание, остальное - приятное баловство  :)

Если делать атомарный lock, тогда нельзя будет конкурентно писать и читать данные. Буфер будет работать псевдопоследовательно - кто-то один пишет или читает. Здесь же конкурентное чтение и запись возможна.
Ну это скорее "моральное удовлетворение", эффективность с простецким локом очень приличная, а код резко упрощается, можно использовать обычный стек или вектор.

Другое дело, что здесь постоянный сброс кеша CPU происходит, но это лишь простейшая реализация, так - "пощупать")).
Я впервые вижу compare_exchange_weak (пользуюсь tbb и Qt), но вроде бы справочник намекает что как раз синхронизации кешей не ждем (поэтому и weak). Или я не так понял?


Название: Re: Реализация простейшего lock-free кольцевого буф
Отправлено: ssoft от Декабрь 07, 2017, 08:32
Я впервые вижу compare_exchange_weak (пользуюсь tbb и Qt), но вроде бы справочник намекает что как раз синхронизации кешей не ждем (поэтому и weak). Или я не так понял?

Конечно нужно подробнее поизучать этот вопрос, но если пофантазировать))), то ...
В случае weak могут быть ложные срабатывания такого рода - даже если реальная память никем не заблокирована и имеет верное значение, операция все-равно может вернуть false.
Возможно, на некоторых архитектурах в таких случаях первоначальная легковесная проверка значения происходит без блокирования (или другие какие принципы).
Но вот для выполнения операции exchange блокирование обязательно и как следствие после успешной операцией обязателен сброс кеша у других CPU.
В случае же strong проверка сразу осуществляется с блокированием.

Но это просто размышления, не более того)).


Название: Re: Реализация простейшего lock-free кольцевого буф
Отправлено: Igors от Декабрь 07, 2017, 08:48
Но вот для выполнения операции exchange блокирование обязательно и как следствие после успешной операцией обязателен сброс кеша у других CPU.
Чего это обязательно? Разве нельзя вернуть false не ожидая синхронизации? Впрочем мои познания в кешах также не отличаются глубиной  :)

А вообще lock-free - увлекательнейший "вынос мозга", хотя часто, увы, непрактичный.

Edit: а, понял о чем Вы. Да, если exchange случился, то новое значение должно быть "для всех". Ну это нормально (или неизбежно), атомic тоже чего-то стоит