会话#

会话可以用于本地执行,连接到一个 本地集群 或者一个现有的 Mars 集群

如果会话没有被显式初始化,Mars将默认为本地执行创建一个会话。

>>> import mars.dataframe as md
>>> df = md.DataFrame([[1, 2], [3, 4]])
>>> df.execute()  # will create a default session for local execution
   0  1
0  1  2
1  3  4
>>> df.fetch()
   0  1
0  1  2
1  3  4

new_session 可用于创建新的默认会话。

>>> import mars
>>> mars.new_session()  # create a new default session
>>> df = md.DataFrame([[1, 2], [3, 4]])
>>> df.execute()  # execute on the session just created
   0  1
0  1  2
1  3  4
>>> df.fetch()  # fetch from the session just created
   0  1
0  1  2
1  3  4

会话可以作为参数显式指定给 executefetch

>>> import mars
>>> import mars.tensor as mt
>>> sess = mars.new_session(default=False)
>>> t = mt.random.rand(3, 2)
>>> t.execute(session=sess)
array([[0.9956293 , 0.06604185],
       [0.25585635, 0.98183162],
       [0.04446616, 0.2417941 ]])
>>> t.fetch(session=sess)
array([[0.9956293 , 0.06604185],
       [0.25585635, 0.98183162],
       [0.04446616, 0.2417941 ]])

在会话上显式调用 .as_default() 将会将该会话设置为默认会话,.execute().fetch() 将受限于默认会话。

每个会话都是独立的。在另一个会话中调用 .fetch() 的 Mars 对象将会失败。

>>> import mars
>>> sess = mars.new_session(default=False)
>>> df.fetch(session=sess)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-7-f10708ec743f> in <module>
----> 1 df.fetch(session=sess)

~/Workspace/mars/mars/dataframe/core.py in fetch(self, session, **kw)
    525             return self._fetch(session=session, **kw)
    526         else:
--> 527             batches = list(self._iter(batch_size=batch_size,
    528                                       session=session, **kw))
    529             return pd.concat(batches) if len(batches) > 1 else batches[0]

~/Workspace/mars/mars/dataframe/core.py in _iter(self, batch_size, session, **kw)
    509                 yield batch_data._fetch(session=session, **kw)
    510         else:
--> 511             yield self._fetch(session=session, **kw)
    512
    513     def iterbatch(self, batch_size=1000, session=None, **kw):

~/Workspace/mars/mars/core/entity/executable.py in _fetch(self, session, **kw)
    120         session = _get_session(self, session)
    121         self._check_session(session, 'fetch')
--> 122         return fetch(self, session=session, **kw)
    123
    124     def fetch(self, session: SessionType = None, **kw):

~/Workspace/mars/mars/deploy/oscar/session.py in fetch(tileable, session, *tileables, **kwargs)
   1391
   1392     session = _ensure_sync(session)
-> 1393     return session.fetch(tileable, *tileables, **kwargs)
   1394
   1395

~/Workspace/mars/mars/deploy/oscar/session.py in fetch(self, *tileables, **kwargs)
   1240     def fetch(self, *tileables, **kwargs) -> list:
   1241         coro = _fetch(*tileables, session=self._isolated_session, **kwargs)
-> 1242         return asyncio.run_coroutine_threadsafe(coro, self._loop).result()
   1243
   1244     @implements(AbstractSyncSession.decref)

~/miniconda3/envs/mars3.8/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    437                 raise CancelledError()
    438             elif self._state == FINISHED:
--> 439                 return self.__get_result()
    440             else:
    441                 raise TimeoutError()

~/miniconda3/envs/mars3.8/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    386     def __get_result(self):
    387         if self._exception:
--> 388             raise self._exception
    389         else:
    390             return self._result

~/Workspace/mars/mars/deploy/oscar/session.py in _fetch(tileable, session, *tileables, **kwargs)
   1375         tileable, tileables = tileable[0], tileable[1:]
   1376     session = _get_isolated_session(session)
-> 1377     data = await session.fetch(tileable, *tileables, **kwargs)
   1378     return data[0] if len(tileables) == 0 else data
   1379

~/Workspace/mars/mars/deploy/oscar/session.py in fetch(self, *tileables, **kwargs)
    807             fetch_infos_list = []
    808             for tileable in tileables:
--> 809                 fetch_tileable, indexes = self._get_to_fetch_tileable(tileable)
    810                 chunk_to_slice = None
    811                 if indexes is not None:

~/Workspace/mars/mars/deploy/oscar/session.py in _get_to_fetch_tileable(self, tileable)
    751                 break
    752             else:
--> 753                 raise ValueError(f'Cannot fetch unexecuted '
    754                                  f'tileable: {tileable}')
    755

ValueError: Cannot fetch unexecuted tileable: DataFrame(op=DataFrameDataSource)

如果没有将 session 参数传递给 new_session,将会创建一个本地会话。

对于分布式,可以将Web UI的URL传递给 new_session 以连接到现有的集群。

>>> import mars
>>> mars.new_session('http://<web_ip>:<web_port>')
>>> df = md.DataFrame([[1, 2], [3, 4]])
>>> df.execute()  # submit to Mars cluster
   0  1
0  1  2
1  3  4