Implementando Streams Cooperativos Simples em C

Olá, Habr!



Obrigado por sua atenção ao nosso post traduzido anterior no REST . Hoje, propomos olhar para o tópico de design de sistema de um ângulo ligeiramente diferente e publicar uma tradução de um artigo de Stephen Brennan, um luminar do Linux, que fala sobre sua própria implementação de multitarefa no espaço do usuário e como isso pode ser benéfico.



A multitarefa, como muitos outros recursos fornecidos pelo sistema operacional, não apenas consideramos algo natural, mas também a percebemos como algo comum. Com nossos poderosos computadores e smartphones, a ideia de um computador não ser capaz de fazer malabarismos com centenas de processos parece estranha. Acho que essa é uma das possibilidades que tornou os computadores tão úteis, mas é por isso que eles são tão complexos, às vezes parece mágica.



É difícil mexer com o código que implementa multitarefa, e nem sempre está claro em quais casos é melhor implementá-lo você mesmo, de modo que você não precise escrever um sistema operacional inteiro. Tenho certeza de que é impossível compreender totalmente o fenômeno até que você mesmo perceba. Portanto, decidi escrever um artigo no qual direi como você pode brincar com uma implementação simples de threading. Neste artigo, implementaremos fluxos simples em um

programa C normal (não em um sistema operacional).



Uma digressão lírica sobre setjmp e longjmp



O agendador será altamente dependente das funções setjmp()e longjmp(). Eles parecem um pouco mágicos, então primeiro descreverei o que eles fazem e depois irei tomar um pouco de tempo e dizer exatamente como.



A função setjmp()permite registrar informações sobre em que estágio de execução o programa se encontrava, para que mais tarde você possa pular novamente para este ponto. Uma variável de tipo é passada para ele jmp_buf, na qual armazenaremos essas informações. Na primeira vez que ela retorna, a função setjmp()retorna 0.



Posteriormente, você pode usar a função longjmp(jmp_buf, value)para retomar instantaneamente a execução do ponto onde foi chamada setjmp(). Em seu programa, essa situação parecerá que setjmp()voltou uma segunda vez. O argumento irá retornar desta vezvalueque você passou longjmp()- é mais conveniente distinguir o segundo retorno do primeiro. Aqui está um exemplo para ilustrar este ponto:



#include <stdio.h>
#include <setjmp.h>

jmp_buf saved_location;
int main(int argc, char **argv)
{
    if (setjmp(saved_location) == 0) {
        printf("We have successfully set up our jump buffer!\n");
    } else {
        printf("We jumped!\n");
        return 0;
    }

    printf("Getting ready to jump!\n");
    longjmp(saved_location, 1);
    printf("This will never execute...\n");
    return 0;
}


Se compilarmos e executarmos este programa, obteremos a seguinte saída:



We have successfully set up our jump buffer!
Getting ready to jump!
We jumped!


Uau! É como uma instrução goto, mas, neste caso, pode até ser usada para pular para fora de uma função. Também é mais difícil de ler do gotoque porque parece uma chamada de função normal. Se o seu código usa setjmp()e em abundância longjmp(), será muito difícil lê-lo para qualquer pessoa (incluindo você).



Como é o caso goto, geralmente é recomendável evitar setjmp()e longjmp(). Mas comogoto, as funções acima podem ser úteis se usadas com moderação e consistência. O planejador precisa ser capaz de alternar contextos, portanto, usaremos esses recursos com responsabilidade. Mais importante ainda, usaremos essas funções de nossa API para que os usuários planejadores não tenham que lidar com esse tipo de complexidade.



Setjmp e longjmp não salvam sua pilha

True, funções setjmp()elongjmp()não se destinam a suportar qualquer salto. Eles foram projetados para um caso prático muito específico. Imagine que você está executando alguma operação complexa, como fazer uma solicitação HTTP. Nesse caso, um conjunto complexo de chamadas de função estará envolvido e, se alguma delas falhar, você terá que retornar um código de erro especial de cada uma delas. Esse código se parecerá com a seguinte listagem, onde quer que você chame a função (talvez dezenas de vezes):



int rv = do_function_call();
if (rv != SUCCESS) {
    return rv;
}


O significado setjmp()e longjmp()é o que setjmp()ajuda a demarcar um lugar antes de embarcar na tarefa realmente difícil. Então você pode centralizar todo o tratamento de erros em um só lugar:



int rv;
jmp_buf buf;
if ((rv = setjmp(buf)) != 0) {
    /*    */
    return;
}
do_complicated_task(buf, args...);


Se alguma função envolvida falhar do_complicated_task(), simplesmente acontecerá longjmp(buf, error_code). Isso significa que cada função na composição do_complicated_task()pode assumir que qualquer chamada de função foi bem-sucedida, o que significa que você não pode colocar esse código para lidar com erros em cada chamada de função (na prática, isso quase nunca é feito, mas este é um tópico para um artigo separado) ...



A ideia básica aqui é que longjmp()só permite que você salte de funções profundamente aninhadas. Você não pode pular para aquela função profundamente aninhada da qual você pulou antes. É assim que a pilha se parece quando você sai da função. Asterisco (*) significa o ponteiro da pilha no qual está armazenado setjmp().



  | Stack before longjmp    | Stack after longjmp
      +-------------------------+----------------------------
stack | main()              (*) | main()
grows | do_http_request()       |
down  | send_a_header()         |
 |    | write_bytes()           |
 v    | write()  - fails!       |


Como você pode ver, você só pode retroceder na pilha, portanto, não há perigo de corrupção de dados. Por outro lado, imagine como seria se você quisesse pular entre as tarefas. Se você ligar setjmp()e depois voltar, fazer outras coisas e tentar retomar o trabalho que já vinha fazendo antes, surgirá um problema:



      | Stack at setjmp() | Stack later      | Stack after longjmp()
      +-------------------+------------------+----------------------
stack | main()            | main()           | main()
grows | do_task_one()     | do_task_two()    | do_stack_two()
down  | subtask()         | subtask()        | subtask()
 |    | foo()             |                  | ???
 v    | bar()         (*) |              (*) | ???               (*)


O ponteiro da pilha salvo setjmp()apontará para um quadro de pilha que não existe mais e pode ter sido substituído por outros dados em algum momento. Se tentarmos longjmp()voltar para a função da qual retornamos com ajuda , coisas muito estranhas começarão e podem levar ao colapso de todo o programa.



Moral: se você for usar setjmp()e longjmp()pular entre tarefas complexas como essas, deve garantir que cada tarefa tenha sua própria pilha separada. Nesse caso, o problema é completamente eliminado, pois quando longjmp()o ponteiro da pilha for zerado, o próprio programa substituirá a pilha pela desejada e não ocorrerá o apagamento da pilha.



Vamos escrever uma API de agendador



A digressão é um pouco longa, mas armados com o que aprendemos, agora podemos implementar fluxos de espaço do usuário. Para começar, observo que é muito útil projetar você mesmo a API para inicializar, criar e iniciar threads. Se fizermos isso com antecedência, teremos um entendimento muito melhor do que exatamente estamos tentando construir!



void scheduler_init(void);
void scheduler_create_task(void (*func)(void*), void *arg);
void scheduler_run(void);


Essas funções serão usadas para inicializar o agendador, adicionar tarefas e, finalmente, iniciar as tarefas no agendador. Uma vez iniciado, ele scheduler_run()será executado até que todas as tarefas sejam concluídas. As tarefas em execução no momento terão as seguintes APIs:



void scheduler_exit_current_task(void);
void scheduler_relinquish(void);


A primeira função é responsável por sair da tarefa. Sair da tarefa também é possível ao retornar de sua função, portanto, essa construção existe apenas por conveniência. A segunda função descreve como nossos threads dirão ao planejador que outra tarefa deve ser executada por um tempo. Quando uma tarefa é chamada scheduler_relinquish(), ela pode ser temporariamente suspensa enquanto outras tarefas estão em execução; mas, eventualmente, a função retornará e a primeira tarefa poderá continuar.



Para obter um exemplo concreto, vamos considerar um caso de uso hipotético para nossa API, com o qual testaremos o agendador:



#include <stdlib.h>
#include <stdio.h>

#include "scheduler.h"

struct tester_args {
    char *name;
    int iters;
};

void tester(void *arg)
{
    int i;
    struct tester_args *ta = (struct tester_args *)arg;
    for (i = 0; i < ta->iters; i++) {
        printf("task %s: %d\n", ta->name, i);
        scheduler_relinquish();
    }
    free(ta);
}

void create_test_task(char *name, int iters)
{
    struct tester_args *ta = malloc(sizeof(*ta));
    ta->name = name;
    ta->iters = iters;
    scheduler_create_task(tester, ta);
}

int main(int argc, char **argv)
{
    scheduler_init();
    create_test_task("first", 5);
    create_test_task("second", 2);
    scheduler_run();
    printf("Finished running all tasks!\n");
    return EXIT_SUCCESS;
}


Neste exemplo, criamos duas tarefas que executam a mesma função, mas usam argumentos diferentes; assim, sua implementação pode ser rastreada separadamente. Cada tarefa executa um número definido de iterações. A cada iteração, ele imprime uma mensagem e permite que outra tarefa seja executada. Esperamos ver algo como a saída do programa:



task first: 0
task second: 0
task first: 1
task second: 1
task first: 2
task first: 3
task first: 4
Finished running all tasks!


Vamos implementar a API do planejador



Para implementar a API, precisamos de algum tipo de representação interna da tarefa. Então, vamos ao que interessa; Vamos coletar os campos de que precisamos:



struct task {
    enum {
        ST_CREATED,
        ST_RUNNING,
        ST_WAITING,
    } status;

    int id;

    jmp_buf buf;

    void (*func)(void*);
    void *arg;

    struct sc_list_head task_list;

    void *stack_bottom;
    void *stack_top;
    int stack_size;
};


Vamos discutir cada um desses campos separadamente. Todas as tarefas criadas devem estar no estado "criado" antes da execução. Quando uma tarefa começa a ser executada, ela entra no estado “em execução” e, se a tarefa precisar esperar por alguma operação assíncrona, ela pode ser colocada no estado “em espera”. Um campo idé simplesmente um identificador exclusivo para uma tarefa. Este bufcontém informações sobre quando a longjmp()tarefa será executada para retomar. Os campos funce argsão passados scheduler_create_task()e necessários para executar a tarefa. O campo é task_listobrigatório para implementar uma lista duplamente vinculada de todas as tarefas. Os campos stack_bottom, stack_tope stack_sizetodos pertencem a uma pilha separada dedicada especificamente para esta tarefa. Abaixo está o endereço retornado pormalloc()mas "top" é um ponteiro para o endereço diretamente acima da região especificada na memória. Visto que a pilha x86 cresce para baixo, você precisa definir o ponteiro da pilha para um valor stack_top, não stack_bottom.



Nessas condições, você pode implementar a função scheduler_create_task():



void scheduler_create_task(void (*func)(void *), void *arg)
{
    static int id = 1;
    struct task *task = malloc(sizeof(*task));
    task->status = ST_CREATED;
    task->func = func;
    task->arg = arg;
    task->id = id++;
    task->stack_size = 16 * 1024;
    task->stack_bottom = malloc(task->stack_size);
    task->stack_top = task->stack_bottom + task->stack_size;
    sc_list_insert_end(&priv.task_list, &task->task_list);
}


Com o uso static int, garantimos que toda vez que a função for chamada, o campo id será incrementado e haverá um novo número. Todo o resto deve ser claro sem explicação, exceto para a função sc_list_insert_end()que simplesmente adiciona struct taskà lista global. A lista global é armazenada dentro de uma segunda estrutura que contém todos os dados privados do planejador. Abaixo está a própria estrutura, bem como sua função de inicialização:



struct scheduler_private {
    jmp_buf buf;
    struct task *current;
    struct sc_list_head task_list;
} priv;

void scheduler_init(void)
{
    priv.current = NULL;
    sc_list_init(&priv.task_list);
}


O campo é task_listusado para se referir a uma lista de tarefas (não surpreendentemente). O campo currentarmazena a tarefa que está sendo executada no momento (ou null, se não houver tais tarefas no momento). Mais importante ainda, o campo bufserá usado para saltar para o código scheduler_run():



enum {
    INIT=0,
    SCHEDULE,
    EXIT_TASK,
};

void scheduler_run(void)
{
    /*     ! */
    switch (setjmp(priv.buf)) {
    case EXIT_TASK:
        scheduler_free_current_task();
    case INIT:
    case SCHEDULE:
        schedule();
        /*       ,    */
        return;
    default:
        fprintf(stderr, "Uh oh, scheduler error\n");
        return;
    }
}


Assim que a função é chamada scheduler_run(), configuramos o buffer setjmp()para que possamos sempre retornar a essa função. Na primeira vez, 0 (INIT) é retornado e chamamos imediatamente schedule(). Posteriormente, podemos passar constantes SCHEDULE ou EXIT_TASK longjmp(), o que provocará diferentes comportamentos. Por enquanto, vamos ignorar o caso EXIT_TASK e pular direto para a implementação schedule():



static void schedule(void)
{
    struct task *next = scheduler_choose_task();

    if (!next) {
        return;
    }

    priv.current = next;
    if (next->status == ST_CREATED) {
        /*
         *     .   
         * ,        .
         */
        register void *top = next->stack_top;
        asm volatile(
            "mov %[rs], %%rsp \n"
            : [ rs ] "+r" (top) ::
        );

        /*
         *   
         */
        next->status = ST_RUNNING;
        next->func(next->arg);

        /*
         *     ,    .   – ,   
         *   
         */
        scheduler_exit_current_task();
    } else {
        longjmp(next->buf, 1);
    }
    /*   */
}


Primeiro, chamamos a função interna para selecionar a próxima tarefa a ser executada. Este agendador funcionará como um carrossel normal, portanto, simplesmente selecionará uma nova tarefa da lista. Se esta função retornar NULL, não temos mais tarefas para realizar e retornamos. Caso contrário, devemos iniciar a execução da tarefa (se estiver no estado ST_CREATED) ou retomar sua execução.



Para executar a tarefa criada, usamos a instrução de montagem para x86_64 para atribuir o campo a um stack_topregistro rsp(ponteiro de pilha). Em seguida, alteramos o estado da tarefa, executamos a função e saímos. Observe que setjmp()ambos longjmp()armazenam e reorganizam os ponteiros da pilha, então aqui só temos que usar o assembler para alterar o ponteiro da pilha.



Se a tarefa já foi iniciada, o campo bufdeve conter o contexto de que precisamos longjmp()para retomar a tarefa, então é isso que fazemos.

A seguir, vamos examinar uma função auxiliar que seleciona a próxima tarefa a ser executada. Este é o coração do planejador e, como eu disse antes, este planejador funciona como um carrossel:



static struct task *scheduler_choose_task(void)
{
    struct task *task;

    sc_list_for_each_entry(task, &priv.task_list, task_list, struct task)
    {
        if (task->status == ST_RUNNING || task->status == ST_CREATED) {
            sc_list_remove(&task->task_list);
            sc_list_insert_end(&priv.task_list, &task->task_list);
            return task;
        }
    }

    return NULL;
}


Se você não está familiarizado com a implementação da minha lista vinculada (retirada do kernel do Linux), não é grande coisa. Uma função sc_list_for_each_entry()é uma macro que permite iterar sobre todas as tarefas na lista de tarefas. A primeira tarefa selecionável (não em um estado pendente) que encontramos é removida de sua posição atual e movida para o final da lista de tarefas. Isso garante que na próxima vez que executarmos o planejador, receberemos uma tarefa diferente (se houver). Retornamos a primeira tarefa disponível para seleção ou NULL se não houver nenhuma tarefa.



Finalmente, vamos prosseguir para a implementação scheduler_relinquish()para ver como a tarefa pode se auto-eliminar:



void scheduler_relinquish(void)
{
    if (setjmp(priv.current->buf)) {
        return;
    } else {
        longjmp(priv.buf, SCHEDULE);
    }
}


Este é outro caso de uso para a função setjmp()em nosso planejador. Basicamente, essa opção pode parecer um pouco confusa. Quando a tarefa chama esta função, nós a usamos setjmp()para salvar o contexto atual (incluindo o ponteiro da pilha real). Em seguida, usamos longjmp()para entrar no escalonador (novamente em scheduler_run()) e passar a função SCHEDULE; portanto, pedimos que você atribua uma nova tarefa.



Quando a tarefa é retomada, a função setjmp()retorna com um valor diferente de zero e saímos de qualquer tarefa que estivéssemos fazendo antes!

Finalmente, eis o que acontece quando uma tarefa é encerrada (isso é feito explicitamente, chamando a função de saída ou retornando da função de tarefa correspondente):



void scheduler_exit_current_task(void)
{
    struct task *task = priv.current;
    sc_list_remove(&task->task_list);
    longjmp(priv.buf, EXIT_TASK);
    /*   */
}

static void scheduler_free_current_task(void)
{
    struct task *task = priv.current;
    priv.current = NULL;
    free(task->stack_bottom);
    free(task);
}


Este é um processo de duas partes. A primeira função é retornada diretamente pela própria tarefa. Estamos removendo a entrada correspondente a esta da lista de tarefas, uma vez que não será mais atribuída. Depois, usando, longjmp()voltamos à função scheduler_run(). Desta vez, usamos EXIT_TASK. É assim que informamos ao planejador o que ele deve chamar antes de atribuir uma nova tarefa scheduler_free_current_task(). Se você voltar à descrição scheduler_run(), verá que é exatamente isso o que ela faz scheduler_run().



Fizemos isso em duas etapas, desde quandoscheduler_exit_current_task(), ele usa ativamente a pilha contida na estrutura da tarefa. Se você liberar a pilha e continuar a usá-la, há uma chance de que a função seja capaz de acessar toda a mesma pilha de memória que acabamos de liberar! Para garantir que isso não aconteça, teremos que longjmp()reverter para o agendador usando uma pilha separada com ajuda . Então, podemos liberar com segurança os dados relacionados à tarefa.



Portanto, analisamos toda a implementação do escalonador inteiramente. Se tentássemos compilá-lo adicionando minha implementação de lista vinculada e o programa principal acima a ela, obteríamos um planejador totalmente funcional! Para não incomodar você com copiar e colar, direciono você para o repositório no github , que contém todo o código deste artigo.



Qual é a utilidade da abordagem descrita?



Se você leu até agora, acho que não há necessidade de convencê-lo de que o exemplo é interessante. Mas, à primeira vista, pode não parecer muito útil. Afinal, em C você pode usar threads "reais" que podem ser executados em paralelo e não precisam esperar uma pela outra até que uma delas chame scheduler_relinquish().



No entanto, vejo isso como um ponto de partida para uma série de implementações interessantes de recursos úteis. Podemos falar sobre tarefas pesadas de I / O, sobre a implementação de um aplicativo assíncrono de thread único, da mesma forma que funcionam os novos utilitários assíncronos em Python. Geradores e co-rotinas também podem ser implementados usando tal sistema. Finalmente, com algum trabalho árduo, este sistema também pode se tornar amigo dos threads "reais" do sistema operacional para fornecer simultaneidade adicional quando necessário. Um projeto interessante está escondido por trás de cada uma dessas idéias, e eu recomendo que você tente concluir uma delas sozinho, caro leitor.



É seguro?



Acho que é mais provável que não do que sim! Não é seguro considerar o código de assembly embutido que afeta o ponteiro da pilha. Não arrisque usar essas coisas na produção, mas certifique-se de mexer com elas e pesquisar!



Uma implementação mais segura de tal sistema pode ser construída usando uma API "não contextual" (veja man getcontext), que permite a você alternar entre esses tipos de "fluxos" de espaço do usuário sem incorporar código de montagem. Infelizmente, essa API não é coberta pelos padrões (ela foi removida da especificação POSIX). Mas ainda pode ser usado como parte do glibc.



Como esse mecanismo pode ser deslocado?



Este planejador, conforme apresentado aqui, funciona apenas se os threads transferem explicitamente o controle de volta para o planejador. Isso não é bom para um programa geral, por exemplo, para um sistema operacional, uma vez que um thread mal feito pode bloquear a execução de todos os outros (embora isso não tenha impedido o uso de multitarefa cooperativa no MS-DOS!) Não acho que isso torne a multitarefa cooperativa claramente ruim; tudo depende do aplicativo.



Ao usar uma API “fora de contexto” não padrão, os sinais POSIX armazenarão o contexto do código que foi executado anteriormente. Ao definir o temporizador para emitir um bipe, o planejador do espaço do usuário pode realmente fornecer uma versão funcional de multitarefa preventiva! Este é outro projeto legal que merece um artigo separado.



All Articles