jupytor-client development

Source

GitHub: https://github.com/jupyter/jupyter_client

Document

Jupyter Client: https://media.readthedocs.org/pdf/jupyter-client/latest/jupyter-client.pdf

Dev

aliyun
/docker_python/ipython/jupyter_client/jupyter_client/work

# ls
__init__.py  myclient.py

cat myclient.py


import os
import sys
pjoin = os.path.join
#from unittest import TestCase

from jupyter_client.kernelspec import KernelSpecManager, NoSuchKernel, NATIVE_KERNEL_NAME
from jupyter_client.manager import start_new_kernel
from jupyter_client.tests.utils import test_env

import pytest

from ipython_genutils.py3compat import string_types
from IPython.utils.capture import capture_output

TIMEOUT = 30

class myclient():
    def setUp(self):
        self.env_patch = test_env()
        self.env_patch.start()
        #self.addCleanup(self.env_patch.stop)
        try:
            KernelSpecManager().get_kernel_spec(NATIVE_KERNEL_NAME)
        except NoSuchKernel:
            pytest.skip()
        self.km, self.kc = start_new_kernel(kernel_name=NATIVE_KERNEL_NAME)
        print("kernel:",NATIVE_KERNEL_NAME)
        #self.addCleanup(self.kc.stop_channels)
        #self.addCleanup(self.km.shutdown_kernel)

    
    def _check_reply(self, reply_type, reply):
        #self.assertIsInstance(reply, dict)
        #self.assertEqual(reply['header']['msg_type'], reply_type + '_reply')
        #self.assertEqual(reply['parent_header']['msg_type'], reply_type + '_request')
        print("check_reply()")

    def test_comm_info(self):
        kc = self.kc
        msg_id = kc.comm_info()
        #self.assertIsInstance(msg_id, string_types)
        reply = kc.comm_info(reply=True, timeout=TIMEOUT)
        print("reply",reply)
        self._check_reply('comm_info', reply)

    def stop(self):
        self.kc.stop_channels()
        self.km.shutdown_kernel()

    def test_execute_interactive(self):
        print(sys._getframe().f_code.co_name)
        kc = self.kc
        with capture_output() as io:
            reply = kc.execute_interactive("print('hello world')", timeout=TIMEOUT)
        assert 'hello world' in io.stdout
        print(io.stdout)
        assert reply['content']['status'] == 'ok'

client = myclient()
client.setUp()
client.test_comm_info()
client.test_execute_interactive()
client.stop()

python myclient.py

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • # Awesome Python [![Awesome](https://cdn.rawgit.com/sindr...
    emily_007阅读 2,228评论 0 3
  • # Python 资源大全中文版 我想很多程序员应该记得 GitHub 上有一个 Awesome - XXX 系列...
    小迈克阅读 3,063评论 1 3
  • # Python 资源大全中文版 我想很多程序员应该记得 GitHub 上有一个 Awesome - XXX 系列...
    aimaile阅读 26,599评论 6 427
  • afinalAfinal是一个android的ioc,orm框架 https://github.com/yangf...
    passiontim阅读 15,585评论 2 45
  • 那年,我23岁,初入社会,委屈只能自己受着。他出现在了我的生活中,用温柔给我温暖
    南荣阅读 233评论 0 0