클라우드 또는 공유 서버에서 개발하는 경우 동일한 코드베이스와 동일한 데이터베이스에 의존하는 팀원과 함께 작업하는 아름다움을 경험했을 수 있습니다! 실제로 다른 사람들과 동일한 클라우드 환경에서 작업 할 때 특정 리소스 (파일, 테이블…)가 여러 팀원간에 공유되는 경우가 있습니다. 즉, 다른 사람들이 동일한 리소스에서 작업하고 있기 때문에 때때로 자신이 차단 될 수 있습니다. 그리고 필요한 파일 / 테이블을 덮어 쓰거나 이동하거나 삭제하면 실망 스러울 수 있습니다.
공유 작업 영역에 의존하지 않고 지연 시간을 피하지 않고 대부분의 Airflow 작업을 수행하려면 Airflow Docker 컨테이너를 사용하고 그 안에서 구문 및 단위 테스트를 실행합니다.
컴퓨터에 클라우드를 사용하려면!
또는 Python / Airflow 구문으로 모든 것이 정상인지 확인할 수있는 로컬 환경에서 공유 환경에서 수행해야하는 유일한 작업은 코드 동작을 테스트하는 것입니다.
이 로컬 환경이 있으면 Airflow가 파일을 업데이트하고 새로 고치는 것을 30 초 이상 기다리면서 코드를 배포하거나 환경에 복사하는 데 많은 처리를하지 않아도됩니다. 로컬 환경에서는 웹 인터페이스 나 Airflow의 로그를 통하는 것보다 더 빠르게 모든 오류를 볼 수 있습니다. 따라서이 로그를 읽으려면 Stackdriver와 같은 다른 애플리케이션으로 전환해야합니다.
컴퓨터와 CI에서 Airflow를 실행하기 위해 우리는 Docker를 사용할 것입니다. 따라서 처음에는 약간의 구성이지만, 결국에는 클라우드 구성을 재현하고 팀원과 공유 할 수 있습니다. Docker 이미지를 다시 빌드하지 않도록 개인 Docker 레지스트리에 저장할 수 있습니다. 제 관점에서는 가상 환경에 설치하는 것보다 이미 구성된 Airflow를 Docker로 실행하는 것이 더 쉽습니다.
FROM puckel/docker-airflow:1.10.9
USER root
# requirements file with packages you need
COPY requirements.txt .
RUN pip install -r requirements.txt
USER airflow
$ docker build -t my-airflow-img:1.0 .
$ docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
my-airflow-img 1.0 96696eea2641 5 minutes ago 1.89GB
코드에서 Airflow 변수를 사용하는 경우 Fernet 키 (Airflow 용 암호화 키)를 구성해야합니다.
# replace <img> with my-airflow-img or puckel/docker-airflow:1.10.9
$ docker run <img> python -c "from cryptography.fernet import Fernet; FERNET_KEY = Fernet.generate_key().decode(); print(FERNET_KEY)"
R9mdx6wCwIb0h_GChn1-Fbcth3H_gTBAjgvf87JLgSU=
# replace <fkey> (by the one you just produced) and <img>
# your Airflow's variables file is expected to be in dags directory (in your local/host machine) with name vars.json
docker run -d -e FERNET_KEY=<fkey> -e DAGS_DIR='dags' \
--name airflow-web \
-v `pwd`/dags/:/usr/local/airflow/dags \
-p 8080:8080 <img>
docker exec -it airflow-web airflow variables -i ./dags/vars.json
특히 CI / CD 파이프 라인에서 테스트를 실행하는 경우 Bash를 꼬집어 삶을 훨씬 쉽게 만들 수 있습니다.
Airflow가 로컬에서 실행 중이면 브라우저에서로드되는 dags를 볼 수 있습니다 localhost:8080
. 구문 오류가있는 경우 dags 페이지에 표시되며을 사용하여 로그에 쉽게 액세스 할 수 있습니다 docker logs -f airflow-web
.
이제이 환경이 있으므로 몇 줄을 더 추가하면 코드 구문을 테스트 할 수 있습니다 (컴파일 단계처럼 생각할 수 있음). Python 및 Airflow 구문을 확인하기 위해 Python 테스트를 실행하고 (예 : 작업에 task_id가 있습니까?) 몇 초 내에 작동하는지 알 수 있습니다.
먼저 파일을 만들고 dags/test_syntax.py
Python 및 Airflow 패키지를 가져와야합니다.
import os
import unittest # usefull to launch tests and imports
from importlib import util # usefull to import a file
from airflow import models as af_models
class DagsTest(unittest.TestCase):
"""Validate dags syntax."""
def test_load_dags_integrity(self):
"""Import ml dag files and check for DAG syntax."""
root_dag_dir = os.path.join(DAG_DIR, 'load')
files = ['load_mesages.py','load_users.py']
self.dags_import(root_dag_dir, files)
import_dags
기능 시도 모두에 정의 된 파일을 가져 list_dag_files
, 덕분에 _load
그와 함께로드 된 모듈 내 DAG를 찾을 수 있다면, 그리고 검사를 _check_if_there_is_a_dag_in
(함수의 내용의 첫 번째 cicle에서 발견 된 공기 흐름과 데이터 테스트 지옥 ) :
def import_dags(self, dag_dir, list_dag_files):
"""
For each file in list_dag_files, we:
- try to load it to check syntax and
- check if there is a dag defined in the file.
"""
for filename in list_dag_files:
module, module_path = self._load(dag_dir, filename)
self._check_if_there_is_a_dag_in(module, module_path)
@staticmethod
def _load(dag_dir, filename):
module_path = os.path.join(dag_dir, filename)
module_name, _ = os.path.splitext(filename)
mod_spec = util.spec_from_file_location(
module_name, module_path)
module = util.module_from_spec(mod_spec)
mod_spec.loader.exec_module(module)
return module, module_path
@staticmethod
def _check_if_there_is_a_dag_in(module, module_path):
"""Look if there is at least one dag defined in the module."""
assert any(
isinstance(var, af_models.DAG)
for var in vars(module).values())
docker exec -it airflow-web python -m unittest dags/test_syntax.py
무결성 테스트가 있으며 단위 테스트를 쉽게 추가 할 수 있습니다.
먼저 파일을 만들고 dags/test_unit.py
일부 패키지를 가져와야합니다 (mock / patch 덕분에 여기에 Airflow가 필요하지 않습니다!).
import unittest
from unittest.mock import MagicMock, patch
from Training import Training
class TrainingUnitTest(unittest.TestCase):
"""
Unit test for training functions.
"""
def setUp(self):
self.dag = MagicMock(return_value="it would be a dag obj")
self.tr = Training(self.dag)
def test_launch(self):
with patch('airflow.operators.python_operator.PythonOperator') \
as mock_op:
self.tr.launch(usecase='topics_in_messages')
mock_op.assert_called_with(
task_id='training_for_topics_in_messages',
python_callable=self.mi._train_model,
op_kwargs={
'usecase': 'topic_in_messages',
'number_of_clusters': 7},
dag=self.dag
)
이제 다음을 사용하여 단위 테스트를 시작할 수 있습니다.
docker exec -it airflow-web python -m unittest dags/test_unit.py
(예… 꿈이 끝날 때까지….)
따라서 다른 사람에게 의존하지 않고 (동일한 클라우드를 공유하는 경우) 코드에 구문 오류가 있는지 확인하여 더 빠르게 작업 할 수 있으며 단위 테스트를 통해 코드베이스를보다 안전한 방식으로 관리 할 수 있습니다.
가는 길에 흥미로운 리소스를 찾았습니다.
읽어 주셔서 감사합니다 !
훌륭한 데이터 엔지니어와 훌륭한 데이터 엔지니어의 차이점은 무엇입니까? 분산 시스템 개념을 이해하면 올바른 빅 데이터 기술을 선택하고 더 나은 데이터 앱과 파이프 라인을 작성하는 데 도움이 될 수 있습니다. 예를 들어, 자동차를 사는 것을 상상해보십시오.
1911 년 Charles Ranlett Flint는 4 개의 다른 비즈니스 회사의 지주 회사로 International Business Machines (IBM)를 설립했습니다. 미국 정보 기술 회사이며 177 개국에서 운영되고있는 세계 최대의 전자 컴퓨터 회사 및 시스템 통합 업체 중 하나입니다.
Pragmatic Programmer는 1999 년에 처음 출판되었으며 이후 역대 최고의 프로그래밍 책으로 선정되었습니다. 저자 Andy Hunt와 David Thomas는 Agile Manifesto의 원저자 중 하나였으며 몇 가지 심각한 자격을 가지고 있습니다.
공격자가 공개적으로 사용 가능한 GraphQL 인터페이스를 사용하여 사이트를 스크랩하거나 서비스 거부 공격을 실행하는 방법에 대해 알아보십시오. 이들은 4 가지 방법 중 하나로이를 수행 할 수 있습니다. 단일 대형 쿼리를 신중하게 구성하여 실행하고, 관련 데이터를 가져올 수있는 병렬 쿼리를 많이 작성하고, 일괄 요청을 사용하여 많은 쿼리를 연속적으로 실행하고, 마지막으로 많은 요청을 보냅니다.
기술 업계의 직책을 위해 인터뷰 할 때 일반적으로 제안을 고려하기 전에 최소한 3 차례의 인터뷰를 거치게됩니다. 라운드는 일반적으로 다음과 같습니다. 그렇게 생각하면 잘못된 것입니다.
훌륭한 개발자를 만드는 비 기술적 인 것들 나는이 기사를 작성하는 것을 한동안 미루고 있습니다. 나는 그것을 작성할 자격이 있다고 생각하지 못했습니다. 오늘은 쓸 때라고 생각했습니다.