Grabduck

Использование ThreadLocal переменных

:

Введение

Вы уже наверно знаете, что поля классов в java бывают статические и не статические. Любое поле класса без модификатора static принадлежит объекту данного класса и создается каждый раз когда создается новый экземпляр класса. Статические переменные(помеченные модификатором static) не принадлежат экземпляру класса и существует всегда в единственном экземпляре независимо от того, сколько экземпляров класса было создано. Появившийся в java 1.2 класс java.lang.ThreadLocalпо сути предоставляет нам ещё одну область жизни объектов, ThreadLocal предоставляет абстракцию над переменными локальными по отношению к потоку испольнения java.lang.Thread. ThreadLocal переменные отличаются от обычных переменных тем, что у каждого потока свой собственный, индивидуально инициализируемый экземпляр переменной, доступ к которой он получает через методы get() или set().

Я в своей практике встречался с четырьмя основными целями применения ThreadLocal переменных:
1. Упрощение API.
2. Cинтаксический сахар.
3. Кеширование непотокобезопасных(non thread safe) ресурсов.
4. Уменьшение области конкуренции между потоками(lock striping).

Упрощение API с помощью ThreadLocal.

Допустим Вы разрабатываете JEE веб приложение, после прохождения аутентификации на странице логина информация о пользователе запоминается в http сессии, и Вам в любой точке кода может понадобится информация о пользователе от которого пришел http запрос.
Наверняка Вы не захотите всю логику помещать в сервлеты и JSP, Вы выделети в приложение несколько слоев(бизнесс логика, доступ к данным и.т.д.), но после распределния ответсвенности по слоям у Вас может возникнуть проблема с тем, что не в каждой точке кода будет доступ к Http сессии, соответсвенно не везде можно будет узнать от какого пользователя пришел запрос.
Встает вопрос? а как проектировать свой API? Добавлять в каждый метод каждого класса дополнительный параметр представляющий данные пользователе?

До появления ThreadLocal это был единственный выход, с появлением же ThreadLocal мы можем привязать данные о пользователи к потоку обработки http запроса, и достать эту информацию в любом месте программы. Для этого нам понадобится зарегистрировать слушателя в контексте веб приложения который будет срабатывать на любой входящий http запрос:

Итак начнем с класса для реализации потокобезопсного контеста пользователя:

package ru.javatalks;

/**
 * @author Vermut
 *
 */
public class SecurityContextHolder {
	
	private static final ThreadLocal<User> threadLocalScope = new  ThreadLocal<>();
	
	public final static User getLoggedUser() {
		return threadLocalScope.get();
	}
	
	public final static void setLoggedUser(User user) {
		threadLocalScope.set(user);
	}

}

Как мы знаем сервера приложений могут выполнять тысячи запросов одновременно и на первый взгляд такой код не потоко безопасен. Ведь действительно мы запоминаем текущего пользователя в статической переменной, а статическая переменная одна на весь класс, и вроде бы как паралельные http запросы должны перетирать данные друг друга.
Но вся фишка ThreadLocal заключается в том что имея всего одну ThreadLocal переменную, мы можем иметь различное значение для каждого из потоков, то есть один поток никогда не прочтет, удалит или не перезатрет данные присвоенные другим потоком. Таким образом несмотря на разделяемую статическую переменную код выше потоко-безопасен.

Хранилище аутентификацционных данных написано, теперь нужно написать и сконфигурировать в web.xml слушателя входящих HTTP запросов, который бы при поступлении запроса присоеденяя данные о пользователе к потоку обработки а по завершении обработки запроса, очищал бы эту информацию.

package ru.javatalks;

import javax.servlet.ServletRequestEvent;
import javax.servlet.ServletRequestListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;

/**
 * @author Vermut
 *
 */
public class AuthentificationPropagationListener implements ServletRequestListener {

	@Override
	public void requestInitialized(ServletRequestEvent event) {
		HttpServletRequest request = (HttpServletRequest) event.getServletRequest();
		HttpSession session = request.getSession(false);
		if (session == null) {
			return;
		}
		User user = (User) session.getAttribute("loged_user");
		SecurityContextHolder.setLoggedUser(user);
	}
	
	@Override
	public void requestDestroyed(ServletRequestEvent event) {
		SecurityContextHolder.setLoggedUser(null);
	}

}

Дело осталось за малым воспользоваться написаными функционалом из любой точки приложения в которой нет доста к http запросу или сессии:
package ru.javatalks;

/**
 * @author Vermut
 *
 */
public class OrderService {
	
	public void createOrder(Order order) {
		User user = SecurityContextHolder.getLoggedUser();
		checkEligibility(user, order.getSum());
		order.setUserId(user.getId())
		...
	}

}

Конечно же функционал реализованный выше редко когда придется писать самому, есть множество библиотек связанных с security в которых это уже реализовано, например spring-security, стоит только иметь в виду что все они будут точно также работать посредством ThreadLocal. Построение API вокруг ThreadLocal широко используется в java enterprise edition и используется не только для ассоциирования контекста безопасности с потоком, но и для других вещей как например транзакции, открытые JPA сессии. Так же хочу заметить что использование ThreadLocal в JEE окружении сопряжено с возникновением многих проблемам и без глубокого понимания платформы jee, многопоточности и механизма загрузки классов от использования ThreadLocal в JEE лучше отказаться. Проблемы порождаемые ThreadLocal переменными в JEE окружении описаны в конце статьи.

2. Cинтаксический сахар или программирование на языке.

ThreadLocal можно использовть для добавления синтаксического сахара в язык java и многие библиотеки этим пользуются, например mybatis:

private String selectPersonSql() { 
    BEGIN(); // Clears ThreadLocal variable 
    SELECT("P.ID, P.USERNAME, P.PASSWORD, P.FULL_NAME"); 
    SELECT("P.LAST_NAME, P.CREATED_ON, P.UPDATED_ON"); 
    FROM("PERSON P"); 
    FROM("ACCOUNT A"); 
    INNER_JOIN("DEPARTMENT D on D.ID = P.DEPARTMENT_ID"); 
    INNER_JOIN("COMPANY C on D.COMPANY_ID = C.ID"); 
    WHERE("P.ID = A.ID"); 
    WHERE("P.FIRST_NAME like ?"); 
    OR(); 
    WHERE("P.LAST_NAME like ?"); 
    GROUP_BY("P.ID"); 
    HAVING("P.LAST_NAME like ?"); 
    OR(); 
    HAVING("P.FIRST_NAME like ?"); 
    ORDER_BY("P.ID"); 
    ORDER_BY("P.FULL_NAME"); 
    return SQL(); 
  }

Код получился легко читаемым, как видно функции BEGIN, SELECT и.т.д. не вызываются ни на одном объекте, то есть они статические и за счет статического импорта появившегося в java 5, вызов таких функций можно осуществлять без префикса класса. Потоко-безопасность достигается за счет того что каждый поток выполнения имеет собственный экземпляр билдера запросов. Конечно следует ожидать, что c появлением лямбд в java 8, использование ThreadLocal в качестве синтаксического сахара потеряет свою актуальность

Кеширование непотокобезопасных(non thread safe) ресурсов.

Однажды делая код ревью одного класса я обнаружил очень интересный баг многопоточности:

public class DateAdapter extends XmlAdapter<String, Date> {

    private static final DateFormat format = new SimpleDateFormat("dd.MM.yyyy");

    @Override
    public String marshal(Date value) throws Exception {
        return format.format(value);
    }

    @Override
    public Date unmarshal(String value) throws Exception {
        return isNullOrEmpty(value)? null: format.parse(value);  
    }

}

Класс использовался как кастомный адаптер для даты в JAXB. Вроде бы простой маленьки класс и в нём негде ошибится, однако есть одно но, класс java.text.SimpleDateFormat не является потоко безопасным, параллельные потоки должны либо синхронизировать доступ к инстансу объекта данного класса, либо отказаться от разделения одного инстанса SimpleDateFormat.
То есть просто создать один экземпляр формата и запомнить в статической переменной нельзя, иначе мы получим мусор на выходе если форматировать даты паралельно из нескольких потоков. Честно говоря в приложении рассчитаном на входящий поток данных 5 тысяч входящих документов в секунду, ни генерировать мусор создавая каждый раз новый экземпляр формата, ни тем более создавать бутылочное горлышко в виде synchronized блоков мне не хотелось, и поскольку стояло жесткое требование по максимуму отказаться от библиотек не входящих в j2se, то есть нельзя было использовать сторонние реализации форматеров то код выше превратился в следующее:
/**
 *
 * @author Vermut
 *
 */
public final class DateAdapter extends XmlAdapter<String, Date> {

    private static final ThreadLocal<DateFormat> THREAD_CACHE = new ThreadLocal<DateFormat> ();

    @Override
    public String marshal(Date value) throws Exception {
        return getFormat().format(value);
    }

    @Override
    public Date unmarshal(String value) throws Exception {
        return isNullOrEmpty(value)? null: getFormat().parse(value);
    }

    private static DateFormat getFormat() {
        DateFormat format = THREAD_CACHE.get();
        if (format == null) {
            format = new SimpleDateFormat("dd.MM.yyyy");
            THREAD_CACHE.set(format);
        }
        return format;
    }

}

Как видно обеспечено кеширование объектов DateFormat без синхронизации. В статье Java Best Practices – DateFormat in a Multithreading Environmentприведены результаты бенчмарков показывающих что такой подход позволяет увеличить производительность парсинга дат до 8 раз по сравнению с созданием каждый раз нового экземпляра формата.

Сужение области конкуренции между потоками(lock striping).

Lock striping техника представления сложного объекта, к которому осуществляется конкуретный доступ в виде отдельных маленьких частей, каждую часть такого объекта можно менять без блокировки целого объекта. Например техника lock striping применена в CuncurrentHashMap - вся коллекция разбита на регионы, и треды при модификации не конкурируют за всю коллекцию целиком, они конкурируют за её отдельные регионы, таким образом острота конкуренции снижается.

Прежде всего для этого параграфа хотелось бы сразу поместить disclaimer и cсылку на эту презентацию
java8 новинки в java.util.concurrent, в презентации авторитетные специалисты в области java оптимизации в квалификации которых не возникает ни каких сомнений, крайне не рекомендуют использовать ThreadLocal для сuncurrent оптимизаций.

Однако пока java8 ещё не зарелизена, а в продакшн энтерпрайз приложений java8 попадет вообще не скоро, то пример использования ThreadLocal я всё же опубликую.

И так представим высоконагруженное приложение с тысячами рабочих потоков. Потоки занимаются тем что обрабатывают пачки входящих документов и нам бы хотелось собирать некоторую статистику о работе приложения, а конкретно количество обработанных документов. Но при этом мы бы хотели чтобы сбор статистики обходился нам бесплатно, и не вносил бы лишних нагрузку в приложение.
Какие сть варианты решения:
Выполнять запрос SELECT COUNT(*) FROM TABLE в базу данных - на таких объемах не очень удачное решение.
Заводить AtomicLong - поможет на небольшом количестве потоков, но не на тысячах.

Итак применим ThreadLocal. Общее количество обработанных сервером документов можно представить как сумму обработанных документов каждым рабочим потоком. То есть к каждому треду можно поставить соответсвии число, и он будет увеличивать это число вообще без конкуренции с каким либо другим потоком, то есть как мы и хотели сбор статистики обходится нам бесплатно.

package ru.javatalks;

import java.util.ArrayList;
import java.util.List;

/**
 * @author Vermut
 *
 */
package ru.javatalks;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @author Vermut
 *
 */
public final class ThreadLocalAdder {
	
	private final ThreadLocal<SumContainer> threadLocalScope = new ThreadLocal<>();
	private final List<SumContainer> allThreadSums = new ArrayList<>();
	private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
	
	public void add(long value) {
		SumContainer threadLocalSum = threadLocalScope.get();
		if (threadLocalSum == null) {
			threadLocalSum = new SumContainer();
			threadLocalScope.set(threadLocalSum);
			/*
			 * Самое первое получение локального блокирует подсчет общей суммы,
			 *  но это критично только пока приложение не войдет в рабочий ритм, 
                          * в    разогнавшемся приложении уже каждый поток хоть раз да проинкрементил свой счетчик.
			 */
			readWriteLock.writeLock().lock();
			try {
				allThreadSums.add(threadLocalSum);
			} finally {
				readWriteLock.writeLock().unlock();
			}
		}
		threadLocalSum.value += value;
	}
	
	public long getSum() {
		long sum = 0L;
		/*
		 * Как видно подсчет суммы медленная операция, так как сумма не хранится в готовом для чтения виде,
		 * что в принципе вписывается в концепцию lock striping, и не является критичным моментом в данном конкретном случае,
		 * так как админы приложения могут и несколько дней не заглядывать в перфоманс монитор.
		 */ 
		readWriteLock.readLock().lock();
		try {
			for (SumContainer threadLocalSum : allThreadSums) {
				sum += threadLocalSum.value;
			}
		} finally {
			readWriteLock.readLock().unlock();
		}
		return sum;
	}
	
	private static class SumContainer {
		/*
		 * Поле специально не volatile и не атомик, потому что абсолютная точность не нужна,
		 * допустимо чтобы запрос на получение статиcтики не увидел последних инкрементов сделанных рабочим потоком.
		 * Таким образом мы добились чего хотели, сбор статистики сделан бесплатным, его стоимость равна чтению из
		 * обычного (несинхронизированного) хешмапа коим по сути является ThreadLocal
		 */
		public long value;
		
	}

}

Для сокращения кода и упрощения понимания концепции ThreadLocal, из кода удалена обработка ситуаций внезапной смерти потоков. По хорошему нужно обрабатывать ситуацию, когда ThreadLocal собирается сборщиком мусора потому, что поток которому она принадлежит более недостижим из корня, но чтобы не отвлекать внимание читателя в сторону PhantomReference обработка этих ситуаций из кода удалена, а аспекты взаимодействия ThreadLocal и сборки мусора вынесены в отдельный параграф.

А вот небольшой тест показывающий применение такого аккомулятора:

package ru.javatalks;

/**
 * @author Vermut
 *
 */
public class ThreadLocalAdderTest {
	
	private static final int OBSERVER_SLEEP_TIMEOUT = 1000;
	private static final long ITERATION_COUNT_PEER_THREAD = 1_000_000_000l;
	private static final int THREAD_COUNT = 10;
	private static final long INCREMENT_COUNT = 10;
	
	
	public static void main(String[] args) throws InterruptedException {
		ThreadLocalAdder threadLocalAdder = new ThreadLocalAdder();
		AdderThread[] threads = new AdderThread[THREAD_COUNT];
		for (int i = 0; i < THREAD_COUNT; i++) {
			threads[i] = new AdderThread(threadLocalAdder);
			threads[i].start();
		}
		int aliveThreadCount = THREAD_COUNT;
		long previousSum = 0l;
		long sum = threadLocalAdder.getSum();
		long sumNotChangedBeetweenIterationCount = 0l;
		long iterationCount = 0;
		while (aliveThreadCount > 0) {
			Thread.sleep(OBSERVER_SLEEP_TIMEOUT);
			iterationCount ++;
			aliveThreadCount = 0;
			for (AdderThread thread : threads) {
				if (thread.isAlive()) {
					aliveThreadCount ++;
				}
			}
			sum = threadLocalAdder.getSum();
			if (previousSum == sum) {
				sumNotChangedBeetweenIterationCount++;
			}
			System.out.println(iterationCount + " : " + sum + " : " +  sumNotChangedBeetweenIterationCount);
			previousSum = sum;
		}
		System.out.println(sum == ITERATION_COUNT_PEER_THREAD * THREAD_COUNT * INCREMENT_COUNT);
	}
	
	private static final class AdderThread extends Thread {
		
		private ThreadLocalAdder adder;

		public AdderThread(ThreadLocalAdder adder) {
			this.adder = adder;
		}
		
		@Override
		public void run() {
			for (long i = 0; i < ITERATION_COUNT_PEER_THREAD; i++) {
				adder.add(INCREMENT_COUNT);
			}
		}
	}

}