Translate

terça-feira, 17 de dezembro de 2013

Java - Sincronizar referência entre threads

Quando se trata de operar um objeto entre threads, seja escrita e leitura posterior, existe a ocorrência de três situações distintas: processamento paralelo, geração de referências do objeto de origem e a atualização tardia da alteração do objeto porque a JVM não garante imediates nesta questão.

Estas três situações mencionadas, quando combinadas, terminam normalmente num problema de sincronismo que pode interferir em muito a integridade do processamento concorrente (não paralelo apenas) de objetos e deixar qualquer programador ou projetista do campo em questão com bastante dor de cabeça costumeiramente.

O problema

A bateria de classes a seguir compõem uma simulação de um problema de atraso no sincronismo prejudicial para a operação concorrente de objetos. O valor operado é outData do tipo ValueObject, pois a operação final copiar os parâmetros de inData, também do tipo ValueObject, para outdata:

A classe ValueObject é o objeto da operação

public class ValueObject {

private long id;
private String name;

public ValueObject() {
this.id = 0L;
this.name = null;
}

public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override

public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("ValueObject [id=");
builder.append(id);
builder.append(", name=");
builder.append(name);
builder.append("]");
return builder.toString();
}

}

A classe AbstractExecutor implementa Runnable para apoiar a concorrência

public abstract class AbstractExecutor implements Runnable {

private ValueObject inData;
private ValueObject outData;

abstract void execute(ValueObject inData, ValueObject outData);

@Override
public void run() {

this.execute(inData, outData);

this.setOutData(outData);

}

public ValueObject getInData() {
return inData;
}

public void setInData(ValueObject inData) {
this.inData = inData;
}

public ValueObject getOutData() {
return outData;
}

public void setOutData(ValueObject outData) {
this.outData = outData;
}

}

A classe FinalExecutor herda AbstractExecutor para realizar a operação concorrente

public class FinalExecutor extends AbstractExecutor {

@Override
protected void execute(ValueObject inData, ValueObject outData) {

outData.setId(inData.getId());
outData.setName(inData.getName());

}

}

A classe ThreadPoolExecution gera a execução paralela com falha no sincronismo 

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class ThreadPoolExecution <E extends AbstractExecutor> {
private static int executions;
static {
executions = 0;
}

private Class<E> clazz;
private ExecutorService threadPool;
public ThreadPoolExecution(Class<E> clazz) {
this.clazz = clazz;
this.threadPool = Executors.newFixedThreadPool(2);
}

public void execute(ValueObject inData, ValueObject  outData) {
try {
E executor = (E) this.clazz.newInstance();
executor.setInData(inData);
executor.setOutData(outData);
executions++;
threadPool.execute(executor);
if (outData.getId() == 0L) {
System.out.println(String.format("Não alterado o objeto da execução %d: %s", executions, outData.toString()));
} else {
System.out.println("ok");
}

} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}

}

A classe MainTest executa em série, mas o resultado é o processamento paralelo porque tem como executor de FinalExecutor o ThreadPoolExecution 

public class MainTest {

public static void main(String[] args) {
ThreadPoolExecution<FinalExecutor> tpe = new ThreadPoolExecution<FinalExecutor>(FinalExecutor.class);
for (long count = 1L; count <= 1000; count++) {
ValueObject inData = new ValueObject();
ValueObject outData = new ValueObject();
inData.setId(count);
inData.setName(String.format("Named %d", count));
tpe.execute(inData, outData);

}
}
}

Resultado com erro

Não alterado o objeto da execução 1: ValueObject [id=1, name=Named 1]
Não alterado o objeto da execução 2: ValueObject [id=0, name=null]
...
Não alterado o objeto da execução 997: ValueObject [id=0, name=null]
Não alterado o objeto da execução 998: ValueObject [id=0, name=null]
Não alterado o objeto da execução 999: ValueObject [id=0, name=null]
Não alterado o objeto da execução 1000: ValueObject [id=0, name=null]

A solução

A execução em ThreadPoolExecution sempre ocorre em paralelo e na maior velocidade possível para garantir produção porque como o próprio nome diz, é um pool de threads com duas threads executoras. Observe o construtor de ThreadPoolExecution.

Por causa disso, não é obrigação da ThreadPoolExecution garantir a atualização em tempo real de qualquer objeto alterado, pois isso cabe a JVM realizar espontaneamente o sincronismo. No caso "O problema" foi um caso de estudo forçado para a execução em paralelo nunca ser superada pela atualização da referência de outData em todas as classes que a referencia.

Para corrigir o problema, onde se inicia a alteração em paralelo das referência, basta fazer um controle de sincronismo e antes disso, garantir que o objeto a ser operado jamais estará nulo porque o sincronismo só funciona com objetos e não com nulo.

Conforme dito, a correção inicia onde a operação é bifurcada para processamento paralelo e na operação fim. Portanto:

A classe FinalExecutor está corrigida no destaque em amarelo

public class FinalExecutor extends AbstractExecutor {

@Override
protected void execute(ValueObject inData, ValueObject outData) {

synchronized (outData) {

outData.setId(inData.getId());
outData.setName(inData.getName());
outData.notify();
}

}



A classe ThreadPoolExecution também foi corrigida conforme destaque em amarelo

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class ThreadPoolExecution <E extends AbstractExecutor> {
private static int executions;
static {
executions = 0;
}

private Class<E> clazz;
private ExecutorService threadPool;
public ThreadPoolExecution(Class<E> clazz) {
this.clazz = clazz;
this.threadPool = Executors.newFixedThreadPool(2);
}

public void execute(ValueObject inData, ValueObject  outData) {
try {
E executor = (E) this.clazz.newInstance();
executor.setInData(inData);
executor.setOutData(outData);
executions++;
threadPool.execute(executor);
synchronized(outData) {

try {
outData.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (outData.getId() == 0L) {
System.out.println(String.format("Não alterado o objeto da execução %d: %s", executions, outData.toString()));
} else {
System.out.println("ok");
}

}

} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}

}

}

Resultado

1: ok
2: ok
...
997: ok
998: ok
999: ok
1000: ok

Espero com este artigo polpar os desenvolvedores e projetistas de softwares terem de gastar horas para analisar a origem deste problema simples de compreensão, mas altamente complexo de ser percebido no dia-a-dia.

Boa sorte.

Nenhum comentário:

Postar um comentário